This is an automated email from the ASF dual-hosted git repository. shengkai pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 549d4327cf4ae9646f74a1da561dcebecd3d47ff Author: Shengkai <1059623...@qq.com> AuthorDate: Mon Aug 29 18:14:47 2022 +0800 [FLINK-28938][hive] Improve error messages for unsupported interfaces --- .../table/endpoint/hive/HiveServer2Endpoint.java | 45 +++++++++++---------- .../endpoint/hive/HiveServer2EndpointITCase.java | 47 +++++++++++++++++++++- 2 files changed, 69 insertions(+), 23 deletions(-) diff --git a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java index f420e9869e6..55bd8b21987 100644 --- a/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java +++ b/flink-connectors/flink-connector-hive/src/main/java/org/apache/flink/table/endpoint/hive/HiveServer2Endpoint.java @@ -155,8 +155,8 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin private static final Logger LOG = LoggerFactory.getLogger(HiveServer2Endpoint.class); private static final HiveServer2EndpointVersion SERVER_VERSION = HIVE_CLI_SERVICE_PROTOCOL_V10; private static final TStatus OK_STATUS = new TStatus(TStatusCode.SUCCESS_STATUS); - private static final String ERROR_MESSAGE = - "The HiveServer2 Endpoint currently doesn't support this API."; + private static final String UNSUPPORTED_ERROR_MESSAGE = + "The HiveServer2 Endpoint currently doesn't support to %s."; // -------------------------------------------------------------------------------------------- // Server attributes @@ -401,19 +401,6 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin tExecuteStatementReq.isSetConfOverlay() ? tExecuteStatementReq.getConfOverlay() : Collections.emptyMap(); - String loggingOperationEnableVar = - HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED.varname; - if (Boolean.parseBoolean( - executionConfig.getOrDefault( - loggingOperationEnableVar, - HiveConf.ConfVars.HIVE_SERVER2_LOGGING_OPERATION_ENABLED - .defaultStrVal))) { - throw new IllegalArgumentException( - String.format( - "SqlGateway doesn't support logging for operation. Please disable" - + " it by setting %s to false.", - loggingOperationEnableVar)); - } long timeout = tExecuteStatementReq.getQueryTimeout(); OperationHandle operationHandle = @@ -623,7 +610,7 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin @Override public TGetCrossReferenceResp GetCrossReference(TGetCrossReferenceReq tGetCrossReferenceReq) throws TException { - throw new UnsupportedOperationException(ERROR_MESSAGE); + return new TGetCrossReferenceResp(buildErrorStatus("GetCrossReference")); } @Override @@ -706,6 +693,14 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin @Override public TFetchResultsResp FetchResults(TFetchResultsReq tFetchResultsReq) throws TException { + if (tFetchResultsReq.getFetchType() != 0) { + // Don't log the annoying messages because Hive beeline will fetch the logs until + // operation is terminated. + return new TFetchResultsResp( + toTStatus( + new UnsupportedOperationException( + "The HiveServer2 endpoint currently doesn't support to fetch logs."))); + } TFetchResultsResp resp = new TFetchResultsResp(); try { SessionHandle sessionHandle = toSessionHandle(tFetchResultsReq.getOperationHandle()); @@ -761,30 +756,32 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin @Override public TGetDelegationTokenResp GetDelegationToken(TGetDelegationTokenReq tGetDelegationTokenReq) throws TException { - throw new UnsupportedOperationException(ERROR_MESSAGE); + return new TGetDelegationTokenResp(buildErrorStatus("GetDelegationToken")); } @Override public TCancelDelegationTokenResp CancelDelegationToken( TCancelDelegationTokenReq tCancelDelegationTokenReq) throws TException { - throw new UnsupportedOperationException(ERROR_MESSAGE); + return new TCancelDelegationTokenResp(buildErrorStatus("CancelDelegationToken")); } @Override public TRenewDelegationTokenResp RenewDelegationToken( TRenewDelegationTokenReq tRenewDelegationTokenReq) throws TException { - throw new UnsupportedOperationException(ERROR_MESSAGE); + return new TRenewDelegationTokenResp(buildErrorStatus("RenewDelegationToken")); } // CHECKSTYLE.OFF: MethodName /** To be compatible with Hive3, add a default implementation. */ public TGetQueryIdResp GetQueryId(TGetQueryIdReq tGetQueryIdReq) throws TException { - throw new UnsupportedOperationException(ERROR_MESSAGE); + throw new TException( + new UnsupportedOperationException( + String.format(UNSUPPORTED_ERROR_MESSAGE, "GetQueryId"))); } /** To be compatible with Hive3, add a default implementation. */ public TSetClientInfoResp SetClientInfo(TSetClientInfoReq tSetClientInfoReq) throws TException { - throw new UnsupportedOperationException(ERROR_MESSAGE); + return new TSetClientInfoResp(buildErrorStatus("SetClientInfo")); } // CHECKSTYLE.ON: MethodName @@ -892,4 +889,10 @@ public class HiveServer2Endpoint implements TCLIService.Iface, SqlGatewayEndpoin return root.getClass().getName() + ": " + root.getMessage(); } } + + private TStatus buildErrorStatus(String methodName) { + return toTStatus( + new UnsupportedOperationException( + String.format(UNSUPPORTED_ERROR_MESSAGE, methodName))); + } } diff --git a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java index 62ff42d8364..2bcaa020fcd 100644 --- a/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java +++ b/flink-connectors/flink-connector-hive/src/test/java/org/apache/flink/table/endpoint/hive/HiveServer2EndpointITCase.java @@ -19,7 +19,6 @@ package org.apache.flink.table.endpoint.hive; import org.apache.flink.FlinkVersion; -import org.apache.flink.core.testutils.FlinkAssertions; import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.api.SqlDialect; import org.apache.flink.table.api.config.TableConfigOptions; @@ -46,6 +45,8 @@ import org.apache.flink.util.function.ThrowingConsumer; import org.apache.hadoop.hive.common.auth.HiveAuthUtils; import org.apache.hadoop.hive.serde2.thrift.Type; +import org.apache.hive.jdbc.HiveConnection; +import org.apache.hive.jdbc.HiveStatement; import org.apache.hive.service.rpc.thrift.TCLIService; import org.apache.hive.service.rpc.thrift.TCancelOperationReq; import org.apache.hive.service.rpc.thrift.TCancelOperationResp; @@ -83,6 +84,7 @@ import java.util.stream.Collectors; import static org.apache.flink.api.common.RuntimeExecutionMode.BATCH; import static org.apache.flink.configuration.ExecutionOptions.RUNTIME_MODE; +import static org.apache.flink.core.testutils.FlinkAssertions.anyCauseMatches; import static org.apache.flink.table.api.config.TableConfigOptions.MAX_LENGTH_GENERATED_CODE; import static org.apache.flink.table.api.config.TableConfigOptions.TABLE_DML_SYNC; import static org.apache.flink.table.endpoint.hive.util.ThriftObjectConversions.toTOperationHandle; @@ -162,6 +164,47 @@ public class HiveServer2EndpointITCase extends TestLogger { "Session '%s' does not exist", sessionHandle))); } + @Test + public void testGetUnsupportedException() throws Exception { + try (HiveConnection connection = (HiveConnection) ENDPOINT_EXTENSION.getConnection(); + HiveStatement statement = (HiveStatement) connection.createStatement()) { + assertThatThrownBy(() -> connection.renewDelegationToken("TokenMessage")) + .satisfies( + anyCauseMatches( + "The HiveServer2 Endpoint currently doesn't support to RenewDelegationToken.")); + assertThatThrownBy(() -> connection.cancelDelegationToken("TokenMessage")) + .satisfies( + anyCauseMatches( + "The HiveServer2 Endpoint currently doesn't support to CancelDelegationToken.")); + assertThatThrownBy(() -> connection.getDelegationToken("Flink", "TokenMessage")) + .satisfies( + anyCauseMatches( + "The HiveServer2 Endpoint currently doesn't support to GetDelegationToken.")); + assertThatThrownBy( + () -> + connection + .getMetaData() + .getCrossReference( + "hive", + "schema", + "table", + "default_catalog", + "default_database", + "table")) + .satisfies( + anyCauseMatches( + "The HiveServer2 Endpoint currently doesn't support to GetCrossReference.")); + assertThatThrownBy( + () -> { + statement.execute("SHOW TABLES"); + statement.getQueryLog(); + }) + .satisfies( + anyCauseMatches( + "The HiveServer2 endpoint currently doesn't support to fetch logs.")); + } + } + @Test public void testCancelOperation() throws Exception { runOperationRequest( @@ -201,7 +244,7 @@ public class HiveServer2EndpointITCase extends TestLogger { .getOperationInfo( sessionHandle, operationHandle)) .satisfies( - FlinkAssertions.anyCauseMatches( + anyCauseMatches( SqlGatewayException.class, String.format( "Can not find the submitted operation in the OperationManager with the %s",