This is an automated email from the ASF dual-hosted git repository.
yao pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-kyuubi.git
The following commit(s) were added to refs/heads/master by this push:
new 4e01f9b [KYUUBI #2033] Hive Backend Engine - GetCrossReference
4e01f9b is described below
commit 4e01f9b9b06f048c642aefbe1678831f65a8e989
Author: KenjiFujima <[email protected]>
AuthorDate: Tue Mar 22 15:22:23 2022 +0800
[KYUUBI #2033] Hive Backend Engine - GetCrossReference
### _Why are the changes needed?_
Hive Backend Engine - GetCrossReference.
### _How was this patch tested?_
- [ ] Add some test cases that check the changes thoroughly including
negative and positive cases if possible
- [ ] Add screenshots for manual tests if appropriate
- [x] [Run
test](https://kyuubi.apache.org/docs/latest/develop_tools/testing.html#running-tests)
locally before make a pull request
Closes #2194 from KenjiFujima/KYUUBI-2033.
Closes #2033
01a0065f [KenjiFujima] [KYUUBI #2033] Hive Backend Engine -
GetCrossReference
Authored-by: KenjiFujima <[email protected]>
Signed-off-by: Kent Yao <[email protected]>
---
.../flink/operation/FlinkSQLOperationManager.scala | 11 +++++
.../engine/hive/operation/GetCrossReference.scala | 44 ++++++++++++++++++++
.../hive/operation/HiveOperationManager.scala | 19 +++++++++
.../engine/hive/operation/HiveOperationSuite.scala | 48 +++++++++++++++++++++-
.../spark/operation/SparkSQLOperationManager.scala | 11 +++++
.../trino/operation/TrinoOperationManager.scala | 11 +++++
.../apache/kyuubi/operation/OperationManager.scala | 8 ++++
.../kyuubi/service/AbstractBackendService.scala | 19 +++++++++
.../org/apache/kyuubi/service/BackendService.scala | 8 ++++
.../apache/kyuubi/service/TFrontendService.scala | 25 ++++++++++-
.../apache/kyuubi/session/AbstractSession.scala | 19 +++++++++
.../scala/org/apache/kyuubi/session/Session.scala | 7 ++++
.../kyuubi/operation/NoopOperationManager.scala | 13 ++++++
.../kyuubi/operation/SparkMetadataTests.scala | 8 +++-
.../kyuubi/service/TFrontendServiceSuite.scala | 15 +++++--
.../apache/kyuubi/metrics/MetricsConstants.scala | 1 +
.../kyuubi/client/KyuubiSyncThriftClient.scala | 20 +++++++++
.../kyuubi/operation/GetCrossReference.scala | 43 +++++++++++++++++++
.../kyuubi/operation/KyuubiOperationManager.scala | 21 +++++++++-
.../kyuubi/server/BackendServiceMetric.scala | 20 +++++++++
.../kyuubi/server/api/v1/SessionsResource.scala | 27 ++++++++++++
.../org/apache/kyuubi/server/api/v1/dto.scala | 8 ++++
.../server/api/v1/SessionsResourceSuite.scala | 16 ++++++--
23 files changed, 409 insertions(+), 13 deletions(-)
diff --git
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
index ff08b4d..d2fb52a 100644
---
a/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
+++
b/externals/kyuubi-flink-sql-engine/src/main/scala/org/apache/kyuubi/engine/flink/operation/FlinkSQLOperationManager.scala
@@ -133,4 +133,15 @@ class FlinkSQLOperationManager extends
OperationManager("FlinkSQLOperationManage
tableName: String): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
+
+ override def newGetCrossReferenceOperation(
+ session: Session,
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
}
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/GetCrossReference.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/GetCrossReference.scala
new file mode 100644
index 0000000..22009e0
--- /dev/null
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/GetCrossReference.scala
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.engine.hive.operation
+
+import org.apache.hive.service.cli.operation.Operation
+
+import org.apache.kyuubi.operation.OperationType
+import org.apache.kyuubi.session.Session
+
+class GetCrossReference(
+ session: Session,
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String)
+ extends HiveOperation(OperationType.GET_FUNCTIONS, session) {
+
+ override val internalHiveOperation: Operation =
+ delegatedOperationManager.newGetCrossReferenceOperation(
+ hive,
+ primaryCatalog,
+ primarySchema,
+ primaryTable,
+ foreignCatalog,
+ foreignSchema,
+ foreignTable)
+}
diff --git
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
index fe68428..04aca79 100644
---
a/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
+++
b/externals/kyuubi-hive-sql-engine/src/main/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationManager.scala
@@ -102,6 +102,25 @@ class HiveOperationManager() extends
OperationManager("HiveOperationManager") {
addOperation(operation)
}
+ override def newGetCrossReferenceOperation(
+ session: Session,
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String): Operation = {
+ val operation = new GetCrossReference(
+ session,
+ primaryCatalog,
+ primarySchema,
+ primaryTable,
+ foreignCatalog,
+ foreignSchema,
+ foreignTable)
+ addOperation(operation)
+ }
+
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
diff --git
a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
index 6b17d8f..7e2a63d 100644
---
a/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
+++
b/externals/kyuubi-hive-sql-engine/src/test/scala/org/apache/kyuubi/engine/hive/operation/HiveOperationSuite.scala
@@ -241,7 +241,7 @@ class HiveOperationSuite extends HiveJDBCTestHelper {
withDatabases("test_schema") { statement =>
statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table(a
string, " +
- "PRIMARY KEY(a) disable novalidate)")
+ "PRIMARY KEY(a) DISABLE NOVALIDATE)")
try {
val meta = statement.getConnection.getMetaData
@@ -261,6 +261,52 @@ class HiveOperationSuite extends HiveJDBCTestHelper {
}
}
+ test("get cross reference") {
+ withDatabases("test_schema") { statement =>
+ statement.execute("CREATE SCHEMA IF NOT EXISTS test_schema")
+ statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table1(a
string, " +
+ "PRIMARY KEY(a) DISABLE NOVALIDATE)")
+ statement.execute("CREATE TABLE IF NOT EXISTS test_schema.test_table2(a
string, b string, " +
+ "FOREIGN KEY(b) REFERENCES test_schema.test_table1(a) DISABLE
NOVALIDATE)")
+
+ try {
+ val meta = statement.getConnection.getMetaData
+ val resultSet = meta.getCrossReference(
+ null,
+ "test_schema",
+ "test_table1",
+ null,
+ "test_schema",
+ "test_table2")
+ val resultSetBuffer =
+ ArrayBuffer[(String, String, String, String, String, String, String,
String)]()
+ while (resultSet.next()) {
+ resultSetBuffer += Tuple8(
+ resultSet.getString("PKTABLE_CAT"),
+ resultSet.getString("PKTABLE_SCHEM"),
+ resultSet.getString("PKTABLE_NAME"),
+ resultSet.getString("PKCOLUMN_NAME"),
+ resultSet.getString("FKTABLE_CAT"),
+ resultSet.getString("FKTABLE_SCHEM"),
+ resultSet.getString("FKTABLE_NAME"),
+ resultSet.getString("FKCOLUMN_NAME"))
+ }
+ assert(resultSetBuffer.contains((
+ null,
+ "test_schema",
+ "test_table1",
+ "a",
+ null,
+ "test_schema",
+ "test_table2",
+ "b")))
+ } finally {
+ statement.execute("DROP TABLE test_schema.test_table2")
+ statement.execute("DROP TABLE test_schema.test_table1")
+ }
+ }
+ }
+
test("basic execute statements, create, insert query") {
withJdbcStatement("hive_engine_test") { statement =>
statement.execute("CREATE TABLE hive_engine_test(id int, value string)
stored as orc")
diff --git
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
index c0c7899..e35dbbc 100644
---
a/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
+++
b/externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala
@@ -140,4 +140,15 @@ class SparkSQLOperationManager private (name: String)
extends OperationManager(n
tableName: String): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
+
+ override def newGetCrossReferenceOperation(
+ session: Session,
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
}
diff --git
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
index 9441024..67367bb 100644
---
a/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
+++
b/externals/kyuubi-trino-engine/src/main/scala/org/apache/kyuubi/engine/trino/operation/TrinoOperationManager.scala
@@ -104,4 +104,15 @@ class TrinoOperationManager extends
OperationManager("TrinoOperationManager") {
tableName: String): Operation = {
throw KyuubiSQLException.featureNotSupported()
}
+
+ override def newGetCrossReferenceOperation(
+ session: Session,
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String): Operation = {
+ throw KyuubiSQLException.featureNotSupported()
+ }
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
index e14941c..3015e34 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/operation/OperationManager.scala
@@ -75,6 +75,14 @@ abstract class OperationManager(name: String) extends
AbstractService(name) {
catalogName: String,
schemaName: String,
tableName: String): Operation
+ def newGetCrossReferenceOperation(
+ session: Session,
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String): Operation
final def addOperation(operation: Operation): Operation = synchronized {
handleToOperation.put(operation.getHandle, operation)
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
index 985ce8c..b24123a 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/AbstractBackendService.scala
@@ -131,6 +131,25 @@ abstract class AbstractBackendService(name: String)
.getPrimaryKeys(catalogName, schemaName, tableName)
}
+ override def getCrossReference(
+ sessionHandle: SessionHandle,
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String): OperationHandle = {
+ sessionManager
+ .getSession(sessionHandle)
+ .getCrossReference(
+ primaryCatalog,
+ primarySchema,
+ primaryTable,
+ foreignCatalog,
+ foreignSchema,
+ foreignTable)
+ }
+
override def getOperationStatus(operationHandle: OperationHandle):
OperationStatus = {
val operation =
sessionManager.operationManager.getOperation(operationHandle)
if (operation.shouldRunAsync) {
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
index 18ce237..638b7e6 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/BackendService.scala
@@ -81,6 +81,14 @@ trait BackendService {
catalogName: String,
schemaName: String,
tableName: String): OperationHandle
+ def getCrossReference(
+ sessionHandle: SessionHandle,
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String): OperationHandle
def getOperationStatus(operationHandle: OperationHandle): OperationStatus
def cancelOperation(operationHandle: OperationHandle): Unit
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
index d2d1269..2f07e3a 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/service/TFrontendService.scala
@@ -384,8 +384,29 @@ abstract class TFrontendService(name: String)
override def GetCrossReference(req: TGetCrossReferenceReq):
TGetCrossReferenceResp = {
debug(req.toString)
val resp = new TGetCrossReferenceResp
- val errStatus = KyuubiSQLException.featureNotSupported().toTStatus
- resp.setStatus(errStatus)
+ try {
+ val sessionHandle = SessionHandle(req.getSessionHandle)
+ val primaryCatalog = req.getParentCatalogName
+ val primarySchema = req.getParentSchemaName
+ val primaryTable = req.getParentTableName
+ val foreignCatalog = req.getForeignCatalogName
+ val foreignSchema = req.getForeignSchemaName
+ val foreignTable = req.getForeignTableName
+ val opHandle = be.getCrossReference(
+ sessionHandle,
+ primaryCatalog,
+ primarySchema,
+ primaryTable,
+ foreignCatalog,
+ foreignSchema,
+ foreignTable)
+ resp.setOperationHandle(opHandle.toTOperationHandle)
+ resp.setStatus(OK_STATUS)
+ } catch {
+ case e: Exception =>
+ error("Error getting primary keys: ", e)
+ resp.setStatus(KyuubiSQLException.toTStatus(e))
+ }
resp
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
index d11e1aa..8300795 100644
---
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
+++
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/AbstractSession.scala
@@ -182,6 +182,25 @@ abstract class AbstractSession(
runOperation(operation)
}
+ override def getCrossReference(
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String): OperationHandle = {
+ val operation = sessionManager.operationManager
+ .newGetCrossReferenceOperation(
+ this,
+ primaryCatalog,
+ primarySchema,
+ primaryTable,
+ foreignCatalog,
+ foreignSchema,
+ foreignTable)
+ runOperation(operation)
+ }
+
override def cancelOperation(operationHandle: OperationHandle): Unit =
withAcquireRelease() {
sessionManager.operationManager.cancelOperation(operationHandle)
}
diff --git
a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
index af84ad9..0218b65 100644
--- a/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
+++ b/kyuubi-common/src/main/scala/org/apache/kyuubi/session/Session.scala
@@ -75,6 +75,13 @@ trait Session {
catalogName: String,
schemaName: String,
tableName: String): OperationHandle
+ def getCrossReference(
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String): OperationHandle
def cancelOperation(operationHandle: OperationHandle): Unit
def closeOperation(operationHandle: OperationHandle): Unit
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
index 4288456..ccd3844 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/NoopOperationManager.scala
@@ -101,6 +101,19 @@ class NoopOperationManager extends
OperationManager("noop") {
addOperation(operation)
}
+ override def newGetCrossReferenceOperation(
+ session: Session,
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String): Operation = {
+ val operation =
+ new NoopOperation(OperationType.GET_FUNCTIONS, session, primarySchema ==
invalid)
+ addOperation(operation)
+ }
+
override def getOperationLogRowSet(
opHandle: OperationHandle,
order: FetchOrientation,
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkMetadataTests.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkMetadataTests.scala
index 66ed51b..8ce1cb7 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkMetadataTests.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/operation/SparkMetadataTests.scala
@@ -464,8 +464,12 @@ trait SparkMetadataTests extends HiveJDBCTestHelper {
assert(e.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))
}
assert(!metaData.getImportedKeys("", "default", "").next())
- intercept[SQLException] {
- metaData.getCrossReference("", "default", "src", "", "default", "src2")
+ try {
+ assert(!metaData.getCrossReference("", "default", "src", "",
"default", "src2").next())
+ } catch {
+ case e: Exception =>
+ assert(e.isInstanceOf[SQLException])
+
assert(e.getMessage.contains(KyuubiSQLException.featureNotSupported().getMessage))
}
assert(!metaData.getIndexInfo("", "default", "src", true, true).next())
diff --git
a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
index cfb99e1..ebf62d1 100644
---
a/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
+++
b/kyuubi-common/src/test/scala/org/apache/kyuubi/service/TFrontendServiceSuite.scala
@@ -342,10 +342,17 @@ class TFrontendServiceSuite extends KyuubiFunSuite {
withSessionHandle { (client, handle) =>
val req = new TGetCrossReferenceReq(handle)
val resp = client.GetCrossReference(req)
- assert(resp.getOperationHandle === null)
- assert(resp.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
- assert(resp.getStatus.getSqlState === "0A000")
- assert(resp.getStatus.getErrorMessage startsWith "feature not supported")
+ val opHandle = resp.getOperationHandle
+ assert(opHandle.getOperationType === TOperationType.GET_FUNCTIONS)
+ assert(resp.getStatus.getStatusCode === TStatusCode.SUCCESS_STATUS)
+ checkOperationResult(client, opHandle)
+
+ req.setSessionHandle(SessionHandle(SERVER_VERSION).toTSessionHandle)
+ val resp1 = client.GetCrossReference(req)
+ assert(resp1.getOperationHandle === null)
+ assert(resp1.getStatus.getStatusCode === TStatusCode.ERROR_STATUS)
+ assert(resp1.getStatus.getSqlState === null)
+ assert(resp1.getStatus.getErrorMessage startsWith "Invalid
SessionHandle")
}
}
diff --git
a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
index b722549..68116f4 100644
---
a/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
+++
b/kyuubi-metrics/src/main/scala/org/apache/kyuubi/metrics/MetricsConstants.scala
@@ -62,6 +62,7 @@ object MetricsConstants {
final val BS_GET_COLUMNS = BACKEND_SERVICE + "get_columns"
final val BS_GET_FUNCTIONS = BACKEND_SERVICE + "get_functions"
final val BS_GET_PRIMARY_KEY = BACKEND_SERVICE + "get_primary_keys"
+ final val BS_GET_CROSS_REFERENCE = BACKEND_SERVICE + "get_cross_reference"
final val BS_GET_OPERATION_STATUS = BACKEND_SERVICE + "get_operation_status"
final val BS_CANCEL_OPERATION = BACKEND_SERVICE + "cancel_operation"
final val BS_CLOSE_OPERATION = BACKEND_SERVICE + "close_operation"
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
index 98843f6..1ec359b 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/client/KyuubiSyncThriftClient.scala
@@ -197,6 +197,26 @@ class KyuubiSyncThriftClient private (protocol: TProtocol)
resp.getOperationHandle
}
+ def getCrossReference(
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String): TOperationHandle = {
+ val req = new TGetCrossReferenceReq()
+ req.setSessionHandle(_remoteSessionHandle)
+ req.setParentCatalogName(primaryCatalog)
+ req.setParentSchemaName(primarySchema)
+ req.setParentTableName(primaryTable)
+ req.setForeignCatalogName(foreignCatalog)
+ req.setForeignSchemaName(foreignSchema)
+ req.setForeignTableName(foreignTable)
+ val resp = withLockAcquired(GetCrossReference(req))
+ ThriftUtils.verifyTStatus(resp.getStatus)
+ resp.getOperationHandle
+ }
+
def getOperationStatus(operationHandle: TOperationHandle):
TGetOperationStatusResp = {
val req = new TGetOperationStatusReq(operationHandle)
val resp = withLockAcquired(GetOperationStatus(req))
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetCrossReference.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetCrossReference.scala
new file mode 100644
index 0000000..cdc0321
--- /dev/null
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/GetCrossReference.scala
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kyuubi.operation
+
+import org.apache.kyuubi.session.Session
+
+class GetCrossReference(
+ session: Session,
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String)
+ extends KyuubiOperation(OperationType.GET_FUNCTIONS, session) {
+
+ override protected def runInternal(): Unit = {
+ try {
+ _remoteOpHandle = client.getCrossReference(
+ primaryCatalog,
+ primarySchema,
+ primaryTable,
+ foreignCatalog,
+ foreignSchema,
+ foreignTable)
+ } catch onError()
+ }
+}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
index c4dd960..db2898b 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/operation/KyuubiOperationManager.scala
@@ -119,7 +119,26 @@ class KyuubiOperationManager private (name: String)
extends OperationManager(nam
catalogName: String,
schemaName: String,
tableName: String): Operation = {
- val operation = new GetFunctions(session, catalogName, schemaName,
tableName)
+ val operation = new GetPrimaryKeys(session, catalogName, schemaName,
tableName)
+ addOperation(operation)
+ }
+
+ override def newGetCrossReferenceOperation(
+ session: Session,
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String): Operation = {
+ val operation = new GetCrossReference(
+ session,
+ primaryCatalog,
+ primarySchema,
+ primaryTable,
+ foreignCatalog,
+ foreignSchema,
+ foreignTable)
addOperation(operation)
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
index 8a2bc77..684f211 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/BackendServiceMetric.scala
@@ -132,6 +132,26 @@ trait BackendServiceMetric extends BackendService {
}
}
+ abstract override def getCrossReference(
+ sessionHandle: SessionHandle,
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String): OperationHandle = {
+ MetricsSystem.timerTracing(MetricsConstants.BS_GET_CROSS_REFERENCE) {
+ super.getCrossReference(
+ sessionHandle,
+ primaryCatalog,
+ primarySchema,
+ primaryTable,
+ foreignCatalog,
+ foreignSchema,
+ foreignTable)
+ }
+ }
+
abstract override def getOperationStatus(operationHandle: OperationHandle):
OperationStatus = {
MetricsSystem.timerTracing(MetricsConstants.BS_GET_OPERATION_STATUS) {
super.getOperationStatus(operationHandle)
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
index 00a5463..5a37f6f 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
@@ -351,4 +351,31 @@ private[v1] class SessionsResource extends
ApiRequestContext with Logging {
throw new NotFoundException(errorMsg)
}
}
+
+ @ApiResponse(
+ responseCode = "200",
+ content = Array(new Content(
+ mediaType = MediaType.APPLICATION_JSON)),
+ description = "Create an operation with GET_FUNCTIONS type")
+ @POST
+ @Path("{sessionHandle}/operations/crossReference")
+ def getCrossReference(
+ @PathParam("sessionHandle") sessionHandleStr: String,
+ request: GetCrossReferenceRequest): OperationHandle = {
+ try {
+ fe.be.getCrossReference(
+ parseSessionHandle(sessionHandleStr),
+ request.primaryCatalog,
+ request.primarySchema,
+ request.primaryTable,
+ request.foreignCatalog,
+ request.foreignSchema,
+ request.foreignTable)
+ } catch {
+ case NonFatal(e) =>
+ val errorMsg = "Error getting cross reference"
+ error(errorMsg, e)
+ throw new NotFoundException(errorMsg)
+ }
+ }
}
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
index eb4a5bf..7158469 100644
--- a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
+++ b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/dto.scala
@@ -76,6 +76,14 @@ case class GetPrimaryKeysRequest(
schemaName: String,
tableName: String)
+case class GetCrossReferenceRequest(
+ primaryCatalog: String,
+ primarySchema: String,
+ primaryTable: String,
+ foreignCatalog: String,
+ foreignSchema: String,
+ foreignTable: String)
+
case class OpActionRequest(action: String)
case class ResultSetMetaData(columns: Seq[ColumnDesc])
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
index d6fd60c..5fe1361 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/SessionsResourceSuite.scala
@@ -251,8 +251,18 @@ class SessionsResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
response = webTarget.path(s"$pathPrefix/operations/primaryKeys")
.request(MediaType.APPLICATION_JSON_TYPE)
.post(Entity.entity(getPrimaryKeysReq, MediaType.APPLICATION_JSON_TYPE))
- assert(200 == response.getStatus)
- operationHandle = response.readEntity(classOf[OperationHandle])
- assert(operationHandle.typ == OperationType.GET_FUNCTIONS)
+ assert(404 == response.getStatus)
+
+ val getCrossReferenceReq = GetCrossReferenceRequest(
+ "spark_catalog",
+ "default",
+ "default",
+ "spark_catalog",
+ "default",
+ "default")
+ response = webTarget.path(s"$pathPrefix/operations/crossReference")
+ .request(MediaType.APPLICATION_JSON_TYPE)
+ .post(Entity.entity(getCrossReferenceReq,
MediaType.APPLICATION_JSON_TYPE))
+ assert(404 == response.getStatus)
}
}