This is an automated email from the ASF dual-hosted git repository.
xiangfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/pinot.git
The following commit(s) were added to refs/heads/master by this push:
new 55b8e8608f Adding encoding type into the BrokerGrpcServer (#15395)
55b8e8608f is described below
commit 55b8e8608fd5ecac9d85c2c956d372fa872e62ed
Author: Xiang Fu <[email protected]>
AuthorDate: Sun Mar 30 16:51:48 2025 -0700
Adding encoding type into the BrokerGrpcServer (#15395)
---
.../apache/pinot/broker/grpc/BrokerGrpcServer.java | 24 ++--
.../apache/pinot/client/grpc/GrpcConnection.java | 13 +-
.../apache/pinot/client/grpc/GrpcResultSet.java | 16 +--
.../org/apache/pinot/client/grpc/GrpcUtils.java | 28 ++--
.../pinot/client/grpc/PinotGrpcResultSet.java | 22 +--
.../response/encoder/JsonResponseEncoder.java | 160 +++++++++++++++++++++
.../common/response/encoder/ResponseEncoder.java | 46 ++++++
.../response/encoder/ResponseEncoderFactory.java | 33 +++++
.../pinot/integration/tests/ClusterTest.java | 9 +-
.../tests/OfflineClusterIntegrationTest.java | 2 +-
.../apache/pinot/spi/utils/CommonConstants.java | 2 +
11 files changed, 288 insertions(+), 67 deletions(-)
diff --git
a/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
b/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
index 845058c1e8..cc8ccb9b5f 100644
---
a/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
+++
b/pinot-broker/src/main/java/org/apache/pinot/broker/grpc/BrokerGrpcServer.java
@@ -34,10 +34,7 @@ import io.grpc.netty.shaded.io.netty.handler.ssl.SslContext;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslContextBuilder;
import io.grpc.netty.shaded.io.netty.handler.ssl.SslProvider;
import io.grpc.stub.StreamObserver;
-import java.io.ByteArrayOutputStream;
import java.io.IOException;
-import java.nio.ByteBuffer;
-import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.stream.Collectors;
import nl.altindag.ssl.SSLFactory;
@@ -54,6 +51,8 @@ import org.apache.pinot.common.proto.PinotQueryBrokerGrpc;
import org.apache.pinot.common.response.BrokerResponse;
import org.apache.pinot.common.response.broker.BrokerResponseNative;
import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.response.encoder.ResponseEncoder;
+import org.apache.pinot.common.response.encoder.ResponseEncoderFactory;
import org.apache.pinot.common.utils.request.RequestUtils;
import org.apache.pinot.common.utils.tls.RenewableTlsUtils;
import org.apache.pinot.common.utils.tls.TlsUtils;
@@ -261,23 +260,17 @@ public class BrokerGrpcServer extends
PinotQueryBrokerGrpc.PinotQueryBrokerImplB
String compressionAlgorithm =
metadataMap.getOrDefault(CommonConstants.Broker.Grpc.COMPRESSION,
CommonConstants.Broker.Grpc.DEFAULT_COMPRESSION);
Compressor compressor =
CompressionFactory.getCompressor(compressionAlgorithm);
+
+ String encodingAlgorithm =
metadataMap.getOrDefault(CommonConstants.Broker.Grpc.ENCODING,
+ CommonConstants.Broker.Grpc.DEFAULT_ENCODING);
+ ResponseEncoder encoder =
ResponseEncoderFactory.getResponseEncoder(encodingAlgorithm);
// Multiple response blocks are compressed data rows
for (int i = 0; i < resultTable.getRows().size(); i += blockRowSize) {
try {
- int rowSize = 0;
+ int rowSize = Math.min(blockRowSize, resultTable.getRows().size() - i);
// Serialize the rows to a byte array
- ByteArrayOutputStream byteArrayOutputStream = new
ByteArrayOutputStream();
-
- for (int j = i; j < Math.min(i + blockRowSize,
resultTable.getRows().size()); j++) {
- String rowString =
JsonUtils.objectToJsonNode(resultTable.getRows().get(j)).toString();
- byte[] bytesToWrite = rowString.getBytes(StandardCharsets.UTF_8);
-
byteArrayOutputStream.write(ByteBuffer.allocate(4).putInt(bytesToWrite.length).array());
- byteArrayOutputStream.write(bytesToWrite);
- rowSize += 1;
- }
-
+ byte[] serializedData = encoder.encodeResultTable(resultTable, i,
rowSize);
// Compress the byte array using the compressor
- byte[] serializedData = byteArrayOutputStream.toByteArray();
byte[] compressedResultTable = compressor.compress(serializedData);
int originalSize = serializedData.length;
int compressedSize = compressedResultTable.length;
@@ -288,6 +281,7 @@ public class BrokerGrpcServer extends
PinotQueryBrokerGrpc.PinotQueryBrokerImplB
.putMetadata("compressedSize", String.valueOf(compressedSize))
.putMetadata("rowSize", String.valueOf(rowSize))
.putMetadata("compression", compressionAlgorithm)
+ .putMetadata("encoding", encodingAlgorithm)
.build();
responseObserver.onNext(dataBlock);
} catch (Exception e) {
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcConnection.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcConnection.java
index 068d021c4c..e30b49979e 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcConnection.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcConnection.java
@@ -38,6 +38,7 @@ import org.apache.pinot.client.ResultSetGroup;
import org.apache.pinot.client.SimpleBrokerSelector;
import org.apache.pinot.common.config.GrpcConfig;
import org.apache.pinot.common.proto.Broker;
+import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.common.utils.grpc.BrokerGrpcQueryClient;
import org.apache.pinot.spi.env.PinotConfiguration;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -48,7 +49,7 @@ import org.slf4j.LoggerFactory;
/**
* A grpc connection to Pinot, normally created through calls to the {@link
org.apache.pinot.client.ConnectionFactory}.
*/
-public class GrpcConnection {
+public class GrpcConnection implements AutoCloseable {
public static final String FAIL_ON_EXCEPTIONS = "failOnExceptions";
private static final Logger LOGGER =
LoggerFactory.getLogger(GrpcConnection.class);
@@ -188,13 +189,18 @@ public class GrpcConnection {
}
// Process schema
JsonNode schemaJsonNode = null;
+ DataSchema dataSchema = null;
if (response.hasNext()) {
- schemaJsonNode = GrpcUtils.extractSchemaJson(response.next());
+ dataSchema = GrpcUtils.extractSchema(response.next());
+ schemaJsonNode = JsonUtils.objectToJsonNode(dataSchema);
}
// Process rows
ArrayNode rows = JsonUtils.newArrayNode();
while (response.hasNext()) {
- rows.addAll(GrpcUtils.extractRowsJson(response.next()));
+ List<Object[]> resultTableRows =
GrpcUtils.extractResultTable(response.next(), dataSchema).getRows();
+ for (Object[] row : resultTableRows) {
+ rows.add(JsonUtils.objectToJsonNode(row));
+ }
}
if (schemaJsonNode != null && rows != null) {
ObjectNode resultTable = JsonUtils.newObjectNode();
@@ -245,6 +251,7 @@ public class GrpcConnection {
*
* @throws PinotClientException when connection is already closed
*/
+ @Override
public void close()
throws PinotClientException {
_grpcQueryClient.shutdown();
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcResultSet.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcResultSet.java
index 271e1e734d..2282b247e2 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcResultSet.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcResultSet.java
@@ -18,14 +18,13 @@
*/
package org.apache.pinot.client.grpc;
-import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.pinot.client.AbstractResultSet;
import org.apache.pinot.client.TextTable;
import org.apache.pinot.common.proto.Broker;
+import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
@@ -35,7 +34,7 @@ import org.apache.pinot.common.utils.DataSchema;
public class GrpcResultSet extends AbstractResultSet {
private final List<String> _columnNamesArray;
private final List<String> _columnDataTypesArray;
- private final ArrayNode _currentBatchRows;
+ private final ResultTable _currentBatchRows;
public GrpcResultSet(DataSchema schema, Broker.BrokerResponse
brokerResponse) {
_columnNamesArray = new ArrayList<>(schema.size());
@@ -45,7 +44,7 @@ public class GrpcResultSet extends AbstractResultSet {
_columnDataTypesArray.add(schema.getColumnDataType(i).toString());
}
try {
- _currentBatchRows = GrpcUtils.extractRowsJson(brokerResponse);
+ _currentBatchRows = GrpcUtils.extractResultTable(brokerResponse, schema);
} catch (IOException e) {
throw new RuntimeException(e);
}
@@ -53,7 +52,7 @@ public class GrpcResultSet extends AbstractResultSet {
@Override
public int getRowCount() {
- return _currentBatchRows.size();
+ return _currentBatchRows.getRows().size();
}
@Override
@@ -73,12 +72,7 @@ public class GrpcResultSet extends AbstractResultSet {
@Override
public String getString(int rowIndex, int columnIndex) {
- JsonNode jsonValue = _currentBatchRows.get(rowIndex).get(columnIndex);
- if (jsonValue.isTextual()) {
- return jsonValue.textValue();
- } else {
- return jsonValue.toString();
- }
+ return _currentBatchRows.getRows().get(rowIndex)[columnIndex].toString();
}
public List<String> getAllColumns() {
diff --git
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcUtils.java
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcUtils.java
index b604d0109a..3235983b97 100644
---
a/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcUtils.java
+++
b/pinot-clients/pinot-java-client/src/main/java/org/apache/pinot/client/grpc/GrpcUtils.java
@@ -19,16 +19,17 @@
package org.apache.pinot.client.grpc;
import com.fasterxml.jackson.databind.JsonNode;
-import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.IOException;
-import java.nio.ByteBuffer;
import java.util.Iterator;
import java.util.Map;
import org.apache.pinot.client.ExecutionStats;
import org.apache.pinot.common.compression.CompressionFactory;
import org.apache.pinot.common.compression.Compressor;
import org.apache.pinot.common.proto.Broker;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.response.encoder.ResponseEncoder;
+import org.apache.pinot.common.response.encoder.ResponseEncoderFactory;
import org.apache.pinot.common.utils.DataSchema;
import org.apache.pinot.spi.utils.CommonConstants;
import org.apache.pinot.spi.utils.JsonUtils;
@@ -59,16 +60,18 @@ public class GrpcUtils {
public static JsonNode extractSchemaJson(Broker.BrokerResponse
brokerResponse)
throws IOException {
- DataSchema schema =
DataSchema.fromBytes(brokerResponse.getPayload().asReadOnlyByteBuffer());
- return JsonUtils.objectToJsonNode(schema);
+ return JsonUtils.objectToJsonNode(extractSchema(brokerResponse));
}
- public static ArrayNode extractRowsJson(Broker.BrokerResponse brokerResponse)
+ public static ResultTable extractResultTable(Broker.BrokerResponse
brokerResponse, DataSchema schema)
throws IOException {
Map<String, String> metadataMap = brokerResponse.getMetadataMap();
String compressionAlgorithm =
metadataMap.getOrDefault(CommonConstants.Broker.Grpc.COMPRESSION,
CommonConstants.Broker.Grpc.DEFAULT_COMPRESSION);
Compressor compressor =
CompressionFactory.getCompressor(compressionAlgorithm);
+ String encodingType =
metadataMap.getOrDefault(CommonConstants.Broker.Grpc.ENCODING,
+ CommonConstants.Broker.Grpc.DEFAULT_ENCODING);
+ ResponseEncoder responseEncoder =
ResponseEncoderFactory.getResponseEncoder(encodingType);
byte[] respBytes = brokerResponse.getPayload().toByteArray();
int rowSize =
Integer.parseInt(brokerResponse.getMetadataOrThrow("rowSize"));
@@ -78,20 +81,7 @@ public class GrpcUtils {
} catch (Exception e) {
throw new RuntimeException(e);
}
- ArrayNode jsonRows = JsonUtils.newArrayNode();
- int bytesRead = 0;
- ByteBuffer byteBuffer = ByteBuffer.wrap(uncompressedPayload);
- for (int i = 0; i < rowSize; i++) {
- int nextRowSize = byteBuffer.getInt(bytesRead);
- bytesRead += 4;
- byte[] rowBytes = new byte[nextRowSize];
- byteBuffer.position(bytesRead);
- byteBuffer.get(rowBytes);
- bytesRead += nextRowSize;
- String rowString = new String(rowBytes);
- jsonRows.add(JsonUtils.stringToJsonNode(rowString));
- }
- return jsonRows;
+ return responseEncoder.decodeResultTable(uncompressedPayload, rowSize,
schema);
}
public static ExecutionStats extractExecutionStats(JsonNode
executionStatsJson) {
diff --git
a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/grpc/PinotGrpcResultSet.java
b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/grpc/PinotGrpcResultSet.java
index 555ce9feb3..a47b3fcfc4 100644
---
a/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/grpc/PinotGrpcResultSet.java
+++
b/pinot-clients/pinot-jdbc-client/src/main/java/org/apache/pinot/client/grpc/PinotGrpcResultSet.java
@@ -18,7 +18,6 @@
*/
package org.apache.pinot.client.grpc;
-import com.fasterxml.jackson.databind.node.ArrayNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -43,6 +42,7 @@ import org.apache.pinot.client.PinotResultMetadata;
import org.apache.pinot.client.base.AbstractBaseResultSet;
import org.apache.pinot.client.utils.DateTimeUtils;
import org.apache.pinot.common.proto.Broker;
+import org.apache.pinot.common.response.broker.ResultTable;
import org.apache.pinot.common.utils.DataSchema;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -56,8 +56,8 @@ public class PinotGrpcResultSet extends AbstractBaseResultSet
{
private final int _totalColumns;
private final Map<String, Integer> _columns = new HashMap<>();
private final Map<Integer, String> _columnDataTypes = new HashMap<>();
-
- private ArrayNode _currentRowBatch;
+ private final DataSchema _dataSchema;
+ private ResultTable _currentRowBatch;
private int _currentBatchSize;
private int _currentBatchIndex = -1;
@@ -71,11 +71,11 @@ public class PinotGrpcResultSet extends
AbstractBaseResultSet {
_brokerResponseIterator = brokerResponseIterator;
_closed = false;
ObjectNode metadata =
GrpcUtils.extractMetadataJson(_brokerResponseIterator.next());
- DataSchema dataSchema =
GrpcUtils.extractSchema(_brokerResponseIterator.next());
- _totalColumns = dataSchema.size();
+ _dataSchema = GrpcUtils.extractSchema(_brokerResponseIterator.next());
+ _totalColumns = _dataSchema.size();
for (int i = 0; i < _totalColumns; i++) {
- _columns.put(dataSchema.getColumnName(i), i + 1);
- _columnDataTypes.put(i + 1, dataSchema.getColumnDataType(i).name());
+ _columns.put(_dataSchema.getColumnName(i), i + 1);
+ _columnDataTypes.put(i + 1, _dataSchema.getColumnDataType(i).name());
}
}
@@ -83,6 +83,7 @@ public class PinotGrpcResultSet extends AbstractBaseResultSet
{
_brokerResponseIterator = null;
_currentBatchSize = 0;
_totalColumns = 0;
+ _dataSchema = null;
}
public static PinotGrpcResultSet empty() {
@@ -271,11 +272,10 @@ public class PinotGrpcResultSet extends
AbstractBaseResultSet {
public String getString(int columnIndex)
throws SQLException {
validateColumn(columnIndex);
- String val = _currentRowBatch.get(_currentBatchIndex).get(columnIndex -
1).asText();
+ String val =
_currentRowBatch.getRows().get(_currentBatchIndex)[columnIndex - 1].toString();
if (checkIsNull(val)) {
return null;
}
-
return val;
}
@@ -425,9 +425,9 @@ public class PinotGrpcResultSet extends
AbstractBaseResultSet {
if (_currentBatchIndex == _currentBatchSize - 1) {
if (_brokerResponseIterator.hasNext()) {
try {
- _currentRowBatch =
GrpcUtils.extractRowsJson(_brokerResponseIterator.next());
+ _currentRowBatch =
GrpcUtils.extractResultTable(_brokerResponseIterator.next(), _dataSchema);
_currentBatchIndex = 0;
- _currentBatchSize = _currentRowBatch.size();
+ _currentBatchSize = _currentRowBatch.getRows().size();
_currentRow++;
return true;
} catch (IOException e) {
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/JsonResponseEncoder.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/JsonResponseEncoder.java
new file mode 100644
index 0000000000..e0f96ea3b7
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/JsonResponseEncoder.java
@@ -0,0 +1,160 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.encoder;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+import org.apache.pinot.spi.utils.JsonUtils;
+
+
+public class JsonResponseEncoder implements ResponseEncoder {
+
+ @Override
+ public byte[] encodeResultTable(ResultTable resultTable, int startRow, int
length)
+ throws IOException {
+ // Serialize the rows to a byte array
+ ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream();
+ for (int j = startRow; j < Math.min(startRow + length,
resultTable.getRows().size()); j++) {
+ String rowString =
JsonUtils.objectToJsonNode(resultTable.getRows().get(j)).toString();
+ byte[] bytesToWrite = rowString.getBytes(StandardCharsets.UTF_8);
+
byteArrayOutputStream.write(ByteBuffer.allocate(4).putInt(bytesToWrite.length).array());
+ byteArrayOutputStream.write(bytesToWrite);
+ }
+ return byteArrayOutputStream.toByteArray();
+ }
+
+ @Override
+ public ResultTable decodeResultTable(byte[] bytes, int rowSize, DataSchema
schema)
+ throws IOException {
+ List<Object[]> rows = new ArrayList<>();
+ int bytesRead = 0;
+ ByteBuffer byteBuffer = ByteBuffer.wrap(bytes);
+ for (int i = 0; i < rowSize; i++) {
+ int nextRowSize = byteBuffer.getInt(bytesRead);
+ bytesRead += 4;
+ byte[] rowBytes = new byte[nextRowSize];
+ byteBuffer.position(bytesRead);
+ byteBuffer.get(rowBytes);
+ bytesRead += nextRowSize;
+ String rowString = new String(rowBytes);
+ JsonNode jsonRow = JsonUtils.stringToJsonNode(rowString);
+ Object[] row = new Object[jsonRow.size()];
+ for (int columnIdx = 0; columnIdx < jsonRow.size(); columnIdx++) {
+ DataSchema.ColumnDataType columnDataType =
schema.getColumnDataType(columnIdx);
+ JsonNode jsonValue = jsonRow.get(columnIdx);
+ if (columnDataType.isArray()) {
+ row[columnIdx] = extractArray(jsonValue);
+ } else if (columnDataType == DataSchema.ColumnDataType.MAP) {
+ row[columnIdx] = extractMap(jsonValue);
+ } else {
+ row[columnIdx] = extractValue(jsonValue);
+ }
+ }
+ rows.add(row);
+ }
+ return new ResultTable(schema, rows);
+ }
+
+ private Object[] extractArray(JsonNode jsonValue) {
+ Object[] array = new Object[jsonValue.size()];
+ for (int k = 0; k < jsonValue.size(); k++) {
+ if (jsonValue.get(k).isNull()) {
+ array[k] = null;
+ } else if (jsonValue.get(k).isBoolean()) {
+ array[k] = jsonValue.get(k).asBoolean();
+ } else if (jsonValue.get(k).isInt()) {
+ array[k] = jsonValue.get(k).asInt();
+ } else if (jsonValue.get(k).isLong()) {
+ array[k] = jsonValue.get(k).asLong();
+ } else if (jsonValue.get(k).isFloat()) {
+ array[k] = jsonValue.get(k).floatValue();
+ } else if (jsonValue.get(k).isDouble()) {
+ array[k] = jsonValue.get(k).asDouble();
+ } else if (jsonValue.get(k).isTextual()) {
+ array[k] = jsonValue.get(k).textValue();
+ } else if (jsonValue.isArray()) {
+ array[k] = extractArray(jsonValue.get(k));
+ } else if (jsonValue.isObject()) {
+ array[k] = extractMap(jsonValue.get(k));
+ } else {
+ array[k] = jsonValue.get(k).toString();
+ }
+ }
+ return array;
+ }
+
+ private Object extractValue(JsonNode jsonValue) {
+ if (jsonValue.isNull()) {
+ return null;
+ }
+ if (jsonValue.isBoolean()) {
+ return jsonValue.asBoolean();
+ }
+ if (jsonValue.isShort()) {
+ return jsonValue.shortValue();
+ }
+ if (jsonValue.isBigInteger()) {
+ return jsonValue.bigIntegerValue();
+ }
+ if (jsonValue.isBigDecimal()) {
+ return jsonValue.decimalValue();
+ }
+ if (jsonValue.isInt()) {
+ return jsonValue.asInt();
+ }
+ if (jsonValue.isLong()) {
+ return jsonValue.asLong();
+ }
+ if (jsonValue.isFloat()) {
+ return jsonValue.floatValue();
+ }
+ if (jsonValue.isDouble()) {
+ return jsonValue.asDouble();
+ }
+ if (jsonValue.isTextual()) {
+ return jsonValue.textValue();
+ }
+ if (jsonValue.isArray()) {
+ return extractArray(jsonValue);
+ }
+ if (jsonValue.isObject()) {
+ return extractMap(jsonValue);
+ }
+ return jsonValue.toString();
+ }
+
+ private Map<String, Object> extractMap(JsonNode jsonValue) {
+ Map<String, Object> map = new HashMap<>();
+ jsonValue.fields().forEachRemaining(entry -> {
+ String key = entry.getKey();
+ Object value = extractValue(entry.getValue());
+ map.put(key, value);
+ });
+ return map;
+ }
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoder.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoder.java
new file mode 100644
index 0000000000..f02e9062c5
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoder.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.encoder;
+
+import java.io.IOException;
+import org.apache.pinot.common.response.broker.ResultTable;
+import org.apache.pinot.common.utils.DataSchema;
+
+
+public interface ResponseEncoder {
+
+ /**
+ * Encode the result table into a byte array.
+ * @param resultTable Result table to encode
+ * @return Encoded byte array
+ */
+ byte[] encodeResultTable(ResultTable resultTable, int startRow, int length)
+ throws IOException;
+
+ /**
+ * Decode the result table from a byte array.
+ *
+ * @param bytes Encoded byte array
+ * @param rowSize Number of rows in the result table
+ * @param schema Schema of the result table
+ * @return Decoded result table
+ */
+ ResultTable decodeResultTable(byte[] bytes, int rowSize, DataSchema schema)
+ throws IOException;
+}
diff --git
a/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoderFactory.java
b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoderFactory.java
new file mode 100644
index 0000000000..ad2abbeb8d
--- /dev/null
+++
b/pinot-common/src/main/java/org/apache/pinot/common/response/encoder/ResponseEncoderFactory.java
@@ -0,0 +1,33 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pinot.common.response.encoder;
+
+public class ResponseEncoderFactory {
+ private ResponseEncoderFactory() {
+ }
+
+ public static ResponseEncoder getResponseEncoder(String format) {
+ switch (format.toUpperCase()) {
+ case "JSON":
+ return new JsonResponseEncoder();
+ default:
+ throw new IllegalArgumentException("Unsupported format: " + format);
+ }
+ }
+}
diff --git
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
index 8a8b7c6a5e..f70900fd5c 100644
---
a/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
+++
b/pinot-integration-test-base/src/test/java/org/apache/pinot/integration/tests/ClusterTest.java
@@ -528,14 +528,9 @@ public abstract class ClusterTest extends ControllerTest {
public JsonNode queryGrpcEndpoint(String query, Map<String, String>
metadataMap)
throws IOException {
- GrpcConnection grpcConnection = null;
- try {
- grpcConnection = ConnectionFactory.fromHostListGrpc(new Properties(),
List.of(getBrokerGrpcEndpoint()));
+ try (GrpcConnection grpcConnection =
ConnectionFactory.fromHostListGrpc(new Properties(),
+ List.of(getBrokerGrpcEndpoint()))) {
return grpcConnection.getJsonResponse(query, metadataMap);
- } finally {
- if (grpcConnection != null) {
- grpcConnection.close();
- }
}
}
diff --git
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
index eb575799e3..7332b9cce4 100644
---
a/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
+++
b/pinot-integration-tests/src/test/java/org/apache/pinot/integration/tests/OfflineClusterIntegrationTest.java
@@ -2772,7 +2772,7 @@ public class OfflineClusterIntegrationTest extends
BaseClusterIntegrationTestSet
assertEquals(row.get(0).asInt(), tmpTableRow.get(0).asInt());
assertEquals(row.get(1).asLong(), tmpTableRow.get(1).asLong());
assertTrue(row.get(2).isNull());
- assertTrue(row.get(2).isNull());
+ assertTrue(row.get(3).isNull());
}
for (int i = 2; i < 363; i++) {
JsonNode row = rows.get(i);
diff --git
a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
index 1151a78077..7eb62120f7 100644
--- a/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
+++ b/pinot-spi/src/main/java/org/apache/pinot/spi/utils/CommonConstants.java
@@ -739,6 +739,8 @@ public class CommonConstants {
public static final int DEFAULT_BLOCK_ROW_SIZE = 10_000;
public static final String COMPRESSION = "compression";
public static final String DEFAULT_COMPRESSION = "ZSTD";
+ public static final String ENCODING = "encoding";
+ public static final String DEFAULT_ENCODING = "JSON";
}
public static final String PREFIX_OF_CONFIG_OF_PINOT_FS_FACTORY =
"pinot.broker.storage.factory";
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]