http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java b/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java new file mode 100644 index 0000000..76ce24a --- /dev/null +++ b/service/src/java/org/apache/hive/service/cli/JobProgressUpdate.java @@ -0,0 +1,38 @@ +package org.apache.hive.service.cli; + +import org.apache.hadoop.hive.common.log.ProgressMonitor; + +import java.util.List; + +public class JobProgressUpdate { + public final double progressedPercentage; + public final String footerSummary; + public final long startTimeMillis; + private final List<String> headers; + private final List<List<String>> rows; + public final String status; + + + JobProgressUpdate(ProgressMonitor monitor) { + this(monitor.headers(), monitor.rows(), monitor.footerSummary(), monitor.progressedPercentage(), + monitor.startTime(), monitor.executionStatus()); + } + + private JobProgressUpdate(List<String> headers, List<List<String>> rows, String footerSummary, + double progressedPercentage, long startTimeMillis, String status) { + this.progressedPercentage = progressedPercentage; + this.footerSummary = footerSummary; + this.startTimeMillis = startTimeMillis; + this.headers = headers; + this.rows = rows; + this.status = status; + } + + public List<String> headers() { + return headers; + } + + public List<List<String>> rows() { + return rows; + } +}
http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/OperationStatus.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/OperationStatus.java b/service/src/java/org/apache/hive/service/cli/OperationStatus.java index b0a26e3..317585f 100644 --- a/service/src/java/org/apache/hive/service/cli/OperationStatus.java +++ b/service/src/java/org/apache/hive/service/cli/OperationStatus.java @@ -30,6 +30,7 @@ public class OperationStatus { private final long operationCompleted; private final boolean hasResultSet; private final HiveSQLException operationException; + private JobProgressUpdate jobProgressUpdate; public OperationStatus(OperationState state, String taskStatus, long operationStarted, long operationCompleted, boolean hasResultSet, HiveSQLException operationException) { this.state = state; @@ -64,4 +65,11 @@ public class OperationStatus { return operationException; } + void setJobProgressUpdate(JobProgressUpdate jobProgressUpdate){ + this.jobProgressUpdate = jobProgressUpdate; + } + + public JobProgressUpdate jobProgressUpdate(){ + return jobProgressUpdate; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/ProgressMonitorStatusMapper.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/ProgressMonitorStatusMapper.java b/service/src/java/org/apache/hive/service/cli/ProgressMonitorStatusMapper.java new file mode 100644 index 0000000..29a5f66 --- /dev/null +++ b/service/src/java/org/apache/hive/service/cli/ProgressMonitorStatusMapper.java @@ -0,0 +1,19 @@ +package org.apache.hive.service.cli; + +import org.apache.hive.service.rpc.thrift.TJobExecutionStatus; + +/** + * This defines the mapping between the internal execution status of various engines and the + * generic states that the progress monitor cares about. Theses are specified by TJobExecutionStatus + */ +public interface ProgressMonitorStatusMapper { + + ProgressMonitorStatusMapper DEFAULT = new ProgressMonitorStatusMapper() { + @Override + public TJobExecutionStatus forStatus(String status) { + return TJobExecutionStatus.NOT_AVAILABLE; + } + }; + + TJobExecutionStatus forStatus(String status); +} http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/TezProgressMonitorStatusMapper.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/TezProgressMonitorStatusMapper.java b/service/src/java/org/apache/hive/service/cli/TezProgressMonitorStatusMapper.java new file mode 100644 index 0000000..88fbd22 --- /dev/null +++ b/service/src/java/org/apache/hive/service/cli/TezProgressMonitorStatusMapper.java @@ -0,0 +1,32 @@ +package org.apache.hive.service.cli; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hive.service.rpc.thrift.TJobExecutionStatus; + +public class TezProgressMonitorStatusMapper implements ProgressMonitorStatusMapper { + + /** + * These states are taken form DAGStatus.State, could not use that here directly as it was + * optional dependency and did not want to include it just for the enum. + */ + enum TezStatus { + SUBMITTED, INITING, RUNNING, SUCCEEDED, KILLED, FAILED, ERROR + + } + + @Override + public TJobExecutionStatus forStatus(String status) { + if (StringUtils.isEmpty(status)) { + return TJobExecutionStatus.NOT_AVAILABLE; + } + TezStatus tezStatus = TezStatus.valueOf(status); + switch (tezStatus) { + case SUBMITTED: + case INITING: + case RUNNING: + return TJobExecutionStatus.IN_PROGRESS; + default: + return TJobExecutionStatus.COMPLETE; + } + } +} http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java index 85b82b6..0e76c91 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/RetryingThriftCLIServiceClient.java @@ -32,7 +32,6 @@ import java.util.concurrent.TimeUnit; import javax.security.sasl.SaslException; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.PlainSaslHelper; import org.apache.hive.service.cli.CLIServiceClient; @@ -191,8 +190,8 @@ public class RetryingThriftCLIServiceClient implements InvocationHandler { } @Override - public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException { - return cliService.getOperationStatus(opHandle); + public OperationStatus getOperationStatus(OperationHandle opHandle, boolean getProgressUpdate) throws HiveSQLException { + return cliService.getOperationStatus(opHandle, getProgressUpdate); } @Override http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java index 2938338..e09d9fe 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIService.java @@ -24,7 +24,6 @@ import java.net.UnknownHostException; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicInteger; import javax.security.auth.login.LoginException; @@ -33,24 +32,25 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.common.ServerUtils; import org.apache.hadoop.hive.shims.HadoopShims.KerberosNameShim; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hive.service.AbstractService; import org.apache.hive.service.ServiceException; import org.apache.hive.service.ServiceUtils; import org.apache.hive.service.auth.HiveAuthFactory; import org.apache.hive.service.auth.TSetIpAddressProcessor; -import org.apache.hive.service.auth.HiveAuthFactory.AuthTypes; import org.apache.hive.service.cli.CLIService; import org.apache.hive.service.cli.FetchOrientation; import org.apache.hive.service.cli.FetchType; import org.apache.hive.service.cli.GetInfoType; import org.apache.hive.service.cli.GetInfoValue; import org.apache.hive.service.cli.HiveSQLException; +import org.apache.hive.service.cli.JobProgressUpdate; import org.apache.hive.service.cli.OperationHandle; import org.apache.hive.service.cli.OperationStatus; +import org.apache.hive.service.cli.ProgressMonitorStatusMapper; import org.apache.hive.service.cli.RowSet; import org.apache.hive.service.cli.SessionHandle; import org.apache.hive.service.cli.TableSchema; +import org.apache.hive.service.cli.TezProgressMonitorStatusMapper; import org.apache.hive.service.cli.session.SessionManager; import org.apache.hive.service.rpc.thrift.TCLIService; import org.apache.hive.service.rpc.thrift.TCancelDelegationTokenReq; @@ -93,6 +93,7 @@ import org.apache.hive.service.rpc.thrift.TGetTypeInfoReq; import org.apache.hive.service.rpc.thrift.TGetTypeInfoResp; import org.apache.hive.service.rpc.thrift.TOpenSessionReq; import org.apache.hive.service.rpc.thrift.TOpenSessionResp; +import org.apache.hive.service.rpc.thrift.TProgressUpdateResp; import org.apache.hive.service.rpc.thrift.TProtocolVersion; import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenReq; import org.apache.hive.service.rpc.thrift.TRenewDelegationTokenResp; @@ -629,15 +630,30 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe @Override public TGetOperationStatusResp GetOperationStatus(TGetOperationStatusReq req) throws TException { TGetOperationStatusResp resp = new TGetOperationStatusResp(); + OperationHandle operationHandle = new OperationHandle(req.getOperationHandle()); try { - OperationStatus operationStatus = cliService.getOperationStatus( - new OperationHandle(req.getOperationHandle())); + OperationStatus operationStatus = + cliService.getOperationStatus(operationHandle, req.isGetProgressUpdate()); resp.setOperationState(operationStatus.getState().toTOperationState()); HiveSQLException opException = operationStatus.getOperationException(); resp.setTaskStatus(operationStatus.getTaskStatus()); resp.setOperationStarted(operationStatus.getOperationStarted()); resp.setOperationCompleted(operationStatus.getOperationCompleted()); resp.setHasResultSet(operationStatus.getHasResultSet()); + JobProgressUpdate progressUpdate = operationStatus.jobProgressUpdate(); + ProgressMonitorStatusMapper mapper = ProgressMonitorStatusMapper.DEFAULT; + if ("tez".equals(hiveConf.getVar(ConfVars.HIVE_EXECUTION_ENGINE))) { + mapper = new TezProgressMonitorStatusMapper(); + } + + resp.setProgressUpdateResponse(new TProgressUpdateResp( + progressUpdate.headers(), + progressUpdate.rows(), + progressUpdate.progressedPercentage, + mapper.forStatus(progressUpdate.status), + progressUpdate.footerSummary, + progressUpdate.startTimeMillis + )); if (opException != null) { resp.setSqlState(opException.getSQLState()); resp.setErrorCode(opException.getErrorCode()); @@ -746,7 +762,7 @@ public abstract class ThriftCLIService extends AbstractService implements TCLISe } return resp; } - + @Override public abstract void run(); http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java ---------------------------------------------------------------------- diff --git a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java index 9805641..617bc40 100644 --- a/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java +++ b/service/src/java/org/apache/hive/service/cli/thrift/ThriftCLIServiceClient.java @@ -367,9 +367,10 @@ public class ThriftCLIServiceClient extends CLIServiceClient { * @see org.apache.hive.service.cli.ICLIService#getOperationStatus(org.apache.hive.service.cli.OperationHandle) */ @Override - public OperationStatus getOperationStatus(OperationHandle opHandle) throws HiveSQLException { + public OperationStatus getOperationStatus(OperationHandle opHandle, boolean getProgressUpdate) throws HiveSQLException { try { TGetOperationStatusReq req = new TGetOperationStatusReq(opHandle.toTOperationHandle()); + req.setGetProgressUpdate(getProgressUpdate); TGetOperationStatusResp resp = cliService.GetOperationStatus(req); // Checks the status of the RPC call, throws an exception in case of error checkStatus(resp.getStatus()); http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java ---------------------------------------------------------------------- diff --git a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java index f325dbc..bc6648e 100644 --- a/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/CLIServiceTest.java @@ -173,11 +173,11 @@ public abstract class CLIServiceTest { queryString = "SELECT ID+1 FROM TEST_EXEC"; opHandle = client.executeStatement(sessionHandle, queryString, confOverlay); - OperationStatus opStatus = client.getOperationStatus(opHandle); + OperationStatus opStatus = client.getOperationStatus(opHandle, false); checkOperationTimes(opHandle, opStatus); // Expect query to be completed now assertEquals("Query should be finished", - OperationState.FINISHED, client.getOperationStatus(opHandle).getState()); + OperationState.FINISHED, client.getOperationStatus(opHandle, false).getState()); client.closeOperation(opHandle); // Cleanup @@ -273,10 +273,10 @@ public abstract class CLIServiceTest { System.out.println("Cancelling " + opHandle); client.cancelOperation(opHandle); - OperationStatus operationStatus = client.getOperationStatus(opHandle); + OperationStatus operationStatus = client.getOperationStatus(opHandle, false); checkOperationTimes(opHandle, operationStatus); - state = client.getOperationStatus(opHandle).getState(); + state = client.getOperationStatus(opHandle, false).getState(); System.out.println(opHandle + " after cancelling, state= " + state); assertEquals("Query should be cancelled", OperationState.CANCELED, state); @@ -545,7 +545,7 @@ public abstract class CLIServiceTest { } longPollingStart = System.currentTimeMillis(); System.out.println("Long polling starts at: " + longPollingStart); - opStatus = client.getOperationStatus(opHandle); + opStatus = client.getOperationStatus(opHandle, false); state = opStatus.getState(); longPollingEnd = System.currentTimeMillis(); System.out.println("Long polling ends at: " + longPollingEnd); @@ -568,7 +568,7 @@ public abstract class CLIServiceTest { assertTrue(longPollingTimeDelta - 0.9*expectedTimeout > 0); } } - assertEquals(expectedState, client.getOperationStatus(opHandle).getState()); + assertEquals(expectedState, client.getOperationStatus(opHandle, false).getState()); client.closeOperation(opHandle); return opStatus; } @@ -606,7 +606,7 @@ public abstract class CLIServiceTest { assertNotNull(opHandle); // query should pass and create the table assertEquals("Query should be finished", - OperationState.FINISHED, client.getOperationStatus(opHandle).getState()); + OperationState.FINISHED, client.getOperationStatus(opHandle, false).getState()); client.closeOperation(opHandle); // select from the new table should pass @@ -615,7 +615,7 @@ public abstract class CLIServiceTest { assertNotNull(opHandle); // query should pass and create the table assertEquals("Query should be finished", - OperationState.FINISHED, client.getOperationStatus(opHandle).getState()); + OperationState.FINISHED, client.getOperationStatus(opHandle, false).getState()); client.closeOperation(opHandle); // the settings in conf overlay should not be part of session config @@ -653,7 +653,7 @@ public abstract class CLIServiceTest { OperationStatus status = null; int count = 0; while (true) { - status = client.getOperationStatus(ophandle); + status = client.getOperationStatus(ophandle, false); checkOperationTimes(ophandle, status); OperationState state = status.getState(); System.out.println("Polling: " + ophandle + " count=" + (++count) http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java ---------------------------------------------------------------------- diff --git a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java index 2855bb3..79953c4 100644 --- a/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java +++ b/service/src/test/org/apache/hive/service/cli/TestRetryingThriftCLIServiceClient.java @@ -206,7 +206,7 @@ public class TestRetryingThriftCLIServiceClient { // operations will be lost once owning session is closed. for (OperationHandle op: new OperationHandle[]{op1, op2}) { try { - client.getOperationStatus(op); + client.getOperationStatus(op, false); fail("Should have failed."); } catch (HiveSQLException ignored) { http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java ---------------------------------------------------------------------- diff --git a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java index abb1ecf..4c59fca 100644 --- a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java +++ b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCLIServiceTest.java @@ -181,7 +181,7 @@ public abstract class ThriftCLIServiceTest { OperationHandle opHandle = client.executeStatement(sessHandle, queryString, opConf); assertNotNull(opHandle); - OperationStatus opStatus = client.getOperationStatus(opHandle); + OperationStatus opStatus = client.getOperationStatus(opHandle, false); assertNotNull(opStatus); OperationState state = opStatus.getState(); @@ -241,7 +241,7 @@ public abstract class ThriftCLIServiceTest { System.out.println("Polling timed out"); break; } - opStatus = client.getOperationStatus(opHandle); + opStatus = client.getOperationStatus(opHandle, false); assertNotNull(opStatus); state = opStatus.getState(); System.out.println("Current state: " + state); @@ -264,7 +264,7 @@ public abstract class ThriftCLIServiceTest { System.out.println("Will attempt to execute: " + queryString); opHandle = client.executeStatementAsync(sessHandle, queryString, opConf); assertNotNull(opHandle); - opStatus = client.getOperationStatus(opHandle); + opStatus = client.getOperationStatus(opHandle, false); assertNotNull(opStatus); isQueryRunning = true; pollTimeout = System.currentTimeMillis() + 100000; @@ -283,7 +283,7 @@ public abstract class ThriftCLIServiceTest { isQueryRunning = false; } Thread.sleep(1000); - opStatus = client.getOperationStatus(opHandle); + opStatus = client.getOperationStatus(opHandle, false); } // Expect query to return an error state assertEquals("Operation should be in error state", http://git-wip-us.apache.org/repos/asf/hive/blob/3e01ef32/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java ---------------------------------------------------------------------- diff --git a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java index a5c8d62..6fec947 100644 --- a/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java +++ b/service/src/test/org/apache/hive/service/cli/thrift/ThriftCliServiceTestWithCookie.java @@ -202,7 +202,7 @@ public class ThriftCliServiceTestWithCookie { OperationHandle opHandle = client.executeStatement(sessHandle, queryString, opConf); assertNotNull(opHandle); - OperationStatus opStatus = client.getOperationStatus(opHandle); + OperationStatus opStatus = client.getOperationStatus(opHandle, false); assertNotNull(opStatus); OperationState state = opStatus.getState();
