This is an automated email from the ASF dual-hosted git repository.
feiwang pushed a commit to branch branch-1.10
in repository https://gitbox.apache.org/repos/asf/kyuubi.git
The following commit(s) were added to refs/heads/branch-1.10 by this push:
new 869db638f9 [KYUUBI #7217] Support customized session protocol version
to support binary type
869db638f9 is described below
commit 869db638f9528a5852fb0daa0c3a33a85730e7db
Author: Wang, Fei <[email protected]>
AuthorDate: Sun Sep 28 21:18:01 2025 -0700
[KYUUBI #7217] Support customized session protocol version to support
binary type
### Why are the changes needed?
For RESTful api, the session protocol version is set to V1 before and not
open to users.
https://github.com/apache/kyuubi/blob/aadf86683554ba6932126c80c9e33bf84cea8dee/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala#L419
And before V6, the result is RowBased, since V6, it is ColumnBased.
https://github.com/apache/kyuubi/blob/aadf86683554ba6932126c80c9e33bf84cea8dee/kyuubi-hive-jdbc/src/main/java/org/apache/kyuubi/jdbc/hive/cli/RowSetFactory.java#L29-L33
And with `HIVE_CLI_SERVICE_PROTOCOL_V1`, for `BINARY` type column, it is
converted as STRING type for RESTful API.
In this PR, we make the session protocol open to users and support to
transfer binary type values.
Why we need to support binary type? Because some bytes can not be converted
to UTF-8 String.
### How was this patch tested?
UT.
### Was this patch authored or co-authored using generative AI tooling?
No.
Closes #7217 from turboFei/rest_binary.
Closes #7217
d747c51ef [Wang, Fei] Support customized session protocol version to
support binary type
Authored-by: Wang, Fei <[email protected]>
Signed-off-by: Wang, Fei <[email protected]>
(cherry picked from commit e613d7f6e4e0b4143dde8bb6ae23d1252698e639)
Signed-off-by: Wang, Fei <[email protected]>
---
docs/client/rest/rest_api.md | 7 +-
.../org/apache/kyuubi/client/api/v1/dto/Field.java | 7 ++
.../client/api/v1/dto/SessionOpenRequest.java | 19 +++++-
.../kyuubi/server/api/v1/OperationsResource.scala | 78 ++++++++--------------
.../kyuubi/server/api/v1/SessionsResource.scala | 5 +-
.../server/api/v1/OperationsResourceSuite.scala | 37 ++++++++--
6 files changed, 90 insertions(+), 63 deletions(-)
diff --git a/docs/client/rest/rest_api.md b/docs/client/rest/rest_api.md
index c12f12ac60..387b1256e0 100644
--- a/docs/client/rest/rest_api.md
+++ b/docs/client/rest/rest_api.md
@@ -99,9 +99,10 @@ Create a session
#### Request Parameters
-| Name | Description | Type |
-|:--------|:---------------------------------|:-----|
-| configs | The configuration of the session | Map |
+| Name | Description
| Type |
+|:----------------|:-------------------------------------------------------------|:-----|
+| protocolVersion | The session thrift protocol version, default value is 0
(V1) | Map |
+| configs | The configuration of the session
| Map |
#### Response Body
diff --git
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Field.java
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Field.java
index 7fe33b2440..f94f5e7324 100644
---
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Field.java
+++
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/Field.java
@@ -17,6 +17,7 @@
package org.apache.kyuubi.client.api.v1.dto;
+import java.util.Base64;
import java.util.Objects;
import org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
@@ -41,6 +42,12 @@ public class Field {
}
public Object getValue() {
+ // For binary type column values, although the data type is "BINARY_VAL",
+ // the value is transmitted as a Base64-encoded string.
+ // Here, we decode it into a byte array.
+ if (value instanceof String && "BINARY_VAL".equalsIgnoreCase(dataType)) {
+ return Base64.getDecoder().decode((String) value);
+ }
return value;
}
diff --git
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/SessionOpenRequest.java
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/SessionOpenRequest.java
index 06eb29e972..677dcb35de 100644
---
a/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/SessionOpenRequest.java
+++
b/kyuubi-rest-client/src/main/java/org/apache/kyuubi/client/api/v1/dto/SessionOpenRequest.java
@@ -24,6 +24,7 @@ import
org.apache.commons.lang3.builder.ReflectionToStringBuilder;
import org.apache.commons.lang3.builder.ToStringStyle;
public class SessionOpenRequest {
+ private Integer protocolVersion;
private Map<String, String> configs;
public SessionOpenRequest() {}
@@ -32,6 +33,19 @@ public class SessionOpenRequest {
this.configs = configs;
}
+ public SessionOpenRequest(Integer protocolVersion, Map<String, String>
configs) {
+ this.protocolVersion = protocolVersion;
+ this.configs = configs;
+ }
+
+ public Integer getProtocolVersion() {
+ return protocolVersion;
+ }
+
+ public void setProtocolVersion(Integer protocolVersion) {
+ this.protocolVersion = protocolVersion;
+ }
+
public Map<String, String> getConfigs() {
if (null == configs) {
return Collections.emptyMap();
@@ -48,12 +62,13 @@ public class SessionOpenRequest {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;
SessionOpenRequest that = (SessionOpenRequest) o;
- return Objects.equals(getConfigs(), that.getConfigs());
+ return Objects.equals(getProtocolVersion(), that.getProtocolVersion())
+ && Objects.equals(getConfigs(), that.getConfigs());
}
@Override
public int hashCode() {
- return Objects.hash(getConfigs());
+ return Objects.hash(getProtocolVersion(), getConfigs());
}
@Override
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
index 224b34cf7a..99f3f99aa7 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/OperationsResource.scala
@@ -29,6 +29,7 @@ import io.swagger.v3.oas.annotations.tags.Tag
import org.apache.kyuubi.{KyuubiSQLException, Logging}
import org.apache.kyuubi.client.api.v1.dto._
+import org.apache.kyuubi.jdbc.hive.cli.{ColumnBasedSet, RowBasedSet,
RowSetFactory}
import org.apache.kyuubi.operation.{FetchOrientation, KyuubiOperation,
OperationHandle}
import org.apache.kyuubi.server.api.{ApiRequestContext, ApiUtils}
import org.apache.kyuubi.shaded.hive.service.rpc.thrift._
@@ -179,62 +180,35 @@ private[v1] class OperationsResource extends
ApiRequestContext with Logging {
@QueryParam("fetchorientation") @DefaultValue("FETCH_NEXT")
fetchOrientation: String): ResultRowSet = {
try {
+ val operationHandle = OperationHandle(operationHandleStr)
val fetchResultsResp = fe.be.fetchResults(
- OperationHandle(operationHandleStr),
+ operationHandle,
FetchOrientation.withName(fetchOrientation),
maxRows,
fetchLog = false)
- val rowSet = fetchResultsResp.getResults
- val rows = rowSet.getRows.asScala.map(i => {
- new Row(i.getColVals.asScala.map(i => {
- new Field(
- i.getSetField.name(),
- i.getSetField match {
- case TColumnValue._Fields.STRING_VAL =>
- if (i.getStringVal.isSetValue) {
- i.getStringVal.getFieldValue(TStringValue._Fields.VALUE)
- } else {
- null
- }
- case TColumnValue._Fields.BOOL_VAL =>
- if (i.getBoolVal.isSetValue) {
- i.getBoolVal.getFieldValue(TBoolValue._Fields.VALUE)
- } else {
- null
- }
- case TColumnValue._Fields.BYTE_VAL =>
- if (i.getByteVal.isSetValue) {
- i.getByteVal.getFieldValue(TByteValue._Fields.VALUE)
- } else {
- null
- }
- case TColumnValue._Fields.DOUBLE_VAL =>
- if (i.getDoubleVal.isSetValue) {
- i.getDoubleVal.getFieldValue(TDoubleValue._Fields.VALUE)
- } else {
- null
- }
- case TColumnValue._Fields.I16_VAL =>
- if (i.getI16Val.isSetValue) {
- i.getI16Val.getFieldValue(TI16Value._Fields.VALUE)
- } else {
- null
- }
- case TColumnValue._Fields.I32_VAL =>
- if (i.getI32Val.isSetValue) {
- i.getI32Val.getFieldValue(TI32Value._Fields.VALUE)
- } else {
- null
- }
- case TColumnValue._Fields.I64_VAL =>
- if (i.getI64Val.isSetValue) {
- i.getI64Val.getFieldValue(TI64Value._Fields.VALUE)
- } else {
- null
- }
- })
- }).asJava)
- })
+ val tRowSet = fetchResultsResp.getResults
+ val rowSet = RowSetFactory.create(
+ tRowSet,
+
fe.sessionManager.operationManager.getOperation(operationHandle).getSession.protocol)
+ if (rowSet.numRows() == 0) {
+ return new ResultRowSet(List.empty[Row].asJava, 0)
+ }
+
+ val columnSize = rowSet.numColumns
+ val columnTypes = rowSet match {
+ case _: ColumnBasedSet =>
+ tRowSet.getColumns.asScala.map(c => { c.getSetField.name() }).toArray
+ case _: RowBasedSet =>
+ tRowSet.getRows.get(0).getColVals.asScala.map(c =>
c.getSetField.name()).toArray
+ }
+
+ val rows = rowSet.iterator().asScala.toSeq.map { r =>
+ new Row(
+ (0 until columnSize).map(i => {
+ val columnValue = r(i)
+ new Field(columnTypes(i), columnValue)
+ }).asJava)
+ }
new ResultRowSet(rows.asJava, rows.size)
} catch {
case e: IllegalArgumentException =>
diff --git
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
index 0954f8828a..2113067ccd 100644
---
a/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
+++
b/kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/SessionsResource.scala
@@ -136,7 +136,8 @@ private[v1] class SessionsResource extends
ApiRequestContext with Logging {
val userName = fe.getSessionUser(request.getConfigs.asScala.toMap)
val ipAddress = fe.getIpAddress
val handle = fe.be.openSession(
- SessionsResource.SESSION_PROTOCOL_VERSION,
+ Option(request.getProtocolVersion).map(v =>
TProtocolVersion.findByValue(v))
+ .getOrElse(SessionsResource.DEFAULT_SESSION_PROTOCOL_VERSION),
userName,
"",
ipAddress,
@@ -416,5 +417,5 @@ private[v1] class SessionsResource extends
ApiRequestContext with Logging {
}
object SessionsResource {
- final val SESSION_PROTOCOL_VERSION =
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1
+ final val DEFAULT_SESSION_PROTOCOL_VERSION =
TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V1
}
diff --git
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
index c4d67ad621..3e784cd5b1 100644
---
a/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
+++
b/kyuubi-server/src/test/scala/org/apache/kyuubi/server/api/v1/OperationsResourceSuite.scala
@@ -32,13 +32,16 @@ import org.apache.kyuubi.client.api.v1.dto._
import org.apache.kyuubi.config.KyuubiConf
import org.apache.kyuubi.operation.{ExecuteStatement, OperationState}
import org.apache.kyuubi.operation.OperationState.{FINISHED, OperationState}
-import
org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V2
+import org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion
+import
org.apache.kyuubi.shaded.hive.service.rpc.thrift.TProtocolVersion.{HIVE_CLI_SERVICE_PROTOCOL_V10,
HIVE_CLI_SERVICE_PROTOCOL_V2}
class OperationsResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper {
override protected lazy val conf: KyuubiConf = KyuubiConf()
.set(KyuubiConf.SERVER_LIMIT_CLIENT_FETCH_MAX_ROWS, 5000)
+ protected val SESSION_PROTOCOL_VERSION = HIVE_CLI_SERVICE_PROTOCOL_V2
+
test("get an operation event") {
val catalogsHandleStr = getOpHandleStr("")
checkOpState(catalogsHandleStr, FINISHED)
@@ -55,7 +58,7 @@ class OperationsResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper
test("apply an action for an operation") {
val sessionHandle = fe.be.openSession(
- HIVE_CLI_SERVICE_PROTOCOL_V2,
+ SESSION_PROTOCOL_VERSION,
"admin",
"123456",
"localhost",
@@ -207,7 +210,7 @@ class OperationsResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper
test("support to return operation progress for REST api") {
val sessionHandle = fe.be.openSession(
- HIVE_CLI_SERVICE_PROTOCOL_V2,
+ SESSION_PROTOCOL_VERSION,
"admin",
"123456",
"localhost",
@@ -222,9 +225,31 @@ class OperationsResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper
}
}
+ test("support binary type in result set") {
+ val opHandleStr = getOpHandleStr("select binary('kyuubi')")
+ checkOpState(opHandleStr, FINISHED)
+ val response = webTarget.path(
+ s"api/v1/operations/$opHandleStr/rowset")
+ .request(MediaType.APPLICATION_JSON).get()
+ assert(200 == response.getStatus)
+ val logRowSet = response.readEntity(classOf[ResultRowSet])
+ assert(logRowSet.getRowCount == 1)
+ val result = logRowSet.getRows.asScala.head.getFields.asScala.head
+ if (SESSION_PROTOCOL_VERSION.getValue >=
+ TProtocolVersion.HIVE_CLI_SERVICE_PROTOCOL_V6.getValue) {
+ // for ColumnBasedSet, the data type is BINARY_VAL
+ assert(result.getDataType == "BINARY_VAL")
+ assert(new String(result.getValue.asInstanceOf[Array[Byte]], "UTF-8") ==
"kyuubi")
+ } else {
+ // for RowBasedSet, the data type is STRING_VAL
+ assert(result.getDataType == "STRING_VAL")
+ assert(result.getValue == "kyuubi")
+ }
+ }
+
def getOpHandleStr(statement: String = "show tables"): String = {
val sessionHandle = fe.be.openSession(
- HIVE_CLI_SERVICE_PROTOCOL_V2,
+ SESSION_PROTOCOL_VERSION,
"admin",
"123456",
"localhost",
@@ -250,3 +275,7 @@ class OperationsResourceSuite extends KyuubiFunSuite with
RestFrontendTestHelper
}
}
}
+
+class OperationsResourceV10ProtocolSuite extends OperationsResourceSuite {
+ override protected val SESSION_PROTOCOL_VERSION =
HIVE_CLI_SERVICE_PROTOCOL_V10
+}