This is an automated email from the ASF dual-hosted git repository. stoty pushed a commit to branch branch-2.6 in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.6 by this push: new c683576aca9 HBASE-28646 Use Streams to unmarshall protobuf REST data (#5974) c683576aca9 is described below commit c683576aca986764a34a370ca58c5f7a314f6335 Author: Istvan Toth <st...@apache.org> AuthorDate: Tue Jun 11 20:00:44 2024 +0200 HBASE-28646 Use Streams to unmarshall protobuf REST data (#5974) Signed-off-by: Duo Zhang <zhang...@apache.org> (cherry picked from commit 91b351264dec4ecf08103577c6bfa51da1197c39) --- .../hadoop/hbase/rest/ProtobufMessageHandler.java | 33 ++++++++++++++++------ .../org/apache/hadoop/hbase/rest/RestUtil.java | 15 +++++----- .../apache/hadoop/hbase/rest/model/CellModel.java | 5 ++-- .../hadoop/hbase/rest/model/CellSetModel.java | 5 ++-- .../hbase/rest/model/NamespacesInstanceModel.java | 6 ++-- .../hadoop/hbase/rest/model/NamespacesModel.java | 6 ++-- .../apache/hadoop/hbase/rest/model/RowModel.java | 3 +- .../hadoop/hbase/rest/model/ScannerModel.java | 5 ++-- .../rest/model/StorageClusterStatusModel.java | 5 ++-- .../hadoop/hbase/rest/model/TableInfoModel.java | 5 ++-- .../hadoop/hbase/rest/model/TableListModel.java | 5 ++-- .../hadoop/hbase/rest/model/TableSchemaModel.java | 5 ++-- .../hadoop/hbase/rest/model/VersionModel.java | 5 ++-- .../consumer/ProtobufMessageBodyConsumer.java | 16 +---------- 14 files changed, 66 insertions(+), 53 deletions(-) diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ProtobufMessageHandler.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ProtobufMessageHandler.java index e3f58a23ce2..8aaacd7101b 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ProtobufMessageHandler.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/ProtobufMessageHandler.java @@ -17,9 +17,11 @@ */ package org.apache.hadoop.hbase.rest; +import com.google.protobuf.CodedInputStream; import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Message; import java.io.IOException; +import java.io.InputStream; import java.io.OutputStream; import org.apache.yetus.audience.InterfaceAudience; @@ -46,7 +48,7 @@ public interface ProtobufMessageHandler { } /** - * Returns the protobuf represention of the model in a byte array Use + * Returns the protobuf represention of the model in a byte array. Use * {@link org.apache.hadoop.hbase.rest.ProtobufMessageHandler#writeProtobufOutput(OutputStream)} * for better performance * @return the protobuf encoded object in a byte array @@ -62,15 +64,28 @@ public interface ProtobufMessageHandler { Message messageFromObject(); /** - * Initialize the model from a protobuf representation. + * Initialize the model from a protobuf representation. Use + * {@link org.apache.hadoop.hbase.rest.ProtobufMessageHandler#getObjectFromMessage(InputStream)} + * for better performance * @param message the raw bytes of the protobuf message * @return reference to self for convenience */ - // TODO implement proper stream handling for unmarshalling. - // Using byte array here lets us use ProtobufUtil.mergeFrom in the implementations to - // avoid the CodedOutputStream size limitation, but is slow - // and memory intensive. We could use the ProtobufUtil.mergeFrom() variant that takes - // an inputStream and sets the size limit to maxInt. - // This would help both on the client side, and when processing large Puts on the server. - ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException; + default ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException { + final CodedInputStream codedInput = CodedInputStream.newInstance(message); + codedInput.setSizeLimit(message.length); + return getObjectFromMessage(codedInput); + } + + /** + * Initialize the model from a protobuf representation. + * @param is InputStream providing the protobuf message + * @return reference to self for convenience + */ + default ProtobufMessageHandler getObjectFromMessage(InputStream is) throws IOException { + final CodedInputStream codedInput = CodedInputStream.newInstance(is); + codedInput.setSizeLimit(Integer.MAX_VALUE); + return getObjectFromMessage(codedInput); + } + + ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException; } diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RestUtil.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RestUtil.java index 13b4d35098e..5c7217f6e54 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RestUtil.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/RestUtil.java @@ -50,15 +50,14 @@ public final class RestUtil { } /** - * Copied from org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil to avoid shading conflicts - * between hbase-shaded-client and hbase-rest in HBase 2.x. This version of protobuf's mergeFrom - * avoids the hard-coded 64MB limit for decoding buffers when working with byte arrays - * @param builder current message builder - * @param b byte array + * Merges the object from codedInput, then calls checkLastTagWas. This is based on + * ProtobufUtil.mergeFrom, but we have already taken care of setSizeLimit() before calling, so + * only the checkLastTagWas() call is retained. + * @param builder protobuf object builder + * @param codedInput encoded object data */ - public static void mergeFrom(Message.Builder builder, byte[] b) throws IOException { - final CodedInputStream codedInput = CodedInputStream.newInstance(b); - codedInput.setSizeLimit(b.length); + public static void mergeFrom(Message.Builder builder, CodedInputStream codedInput) + throws IOException { builder.mergeFrom(codedInput); codedInput.checkLastTagWas(0); } diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/CellModel.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/CellModel.java index 606ff0357e5..409bc81a8f7 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/CellModel.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/CellModel.java @@ -21,6 +21,7 @@ import static org.apache.hadoop.hbase.KeyValue.COLUMN_FAMILY_DELIMITER; import com.fasterxml.jackson.annotation.JsonIgnore; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.protobuf.CodedInputStream; import com.google.protobuf.Message; import java.io.IOException; import java.io.Serializable; @@ -216,9 +217,9 @@ public class CellModel implements ProtobufMessageHandler, Serializable { } @Override - public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException { + public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException { Cell.Builder builder = Cell.newBuilder(); - RestUtil.mergeFrom(builder, message); + RestUtil.mergeFrom(builder, cis); setColumn(builder.getColumn().toByteArray()); setValue(builder.getData().toByteArray()); if (builder.hasTimestamp()) { diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/CellSetModel.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/CellSetModel.java index fedfe1de26c..90555b3fae1 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/CellSetModel.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/CellSetModel.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.rest.model; import static org.apache.hadoop.hbase.rest.model.CellModel.MAGIC_LENGTH; +import com.google.protobuf.CodedInputStream; import com.google.protobuf.Message; import java.io.IOException; import java.io.Serializable; @@ -137,9 +138,9 @@ public class CellSetModel implements Serializable, ProtobufMessageHandler { } @Override - public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException { + public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException { CellSet.Builder builder = CellSet.newBuilder(); - RestUtil.mergeFrom(builder, message); + RestUtil.mergeFrom(builder, cis); for (CellSet.Row row : builder.getRowsList()) { RowModel rowModel = new RowModel(row.getKey().toByteArray()); for (Cell cell : row.getValuesList()) { diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/NamespacesInstanceModel.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/NamespacesInstanceModel.java index 924ffea73f6..2a559ca8959 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/NamespacesInstanceModel.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/NamespacesInstanceModel.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.rest.model; +import com.google.protobuf.CodedInputStream; import com.google.protobuf.Message; import java.io.IOException; import java.io.Serializable; @@ -30,6 +31,7 @@ import javax.xml.bind.annotation.XmlTransient; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.rest.ProtobufMessageHandler; +import org.apache.hadoop.hbase.rest.RestUtil; import org.apache.hadoop.hbase.rest.protobuf.generated.NamespacePropertiesMessage.NamespaceProperties; import org.apache.yetus.audience.InterfaceAudience; @@ -155,9 +157,9 @@ public class NamespacesInstanceModel implements Serializable, ProtobufMessageHan } @Override - public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException { + public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException { NamespaceProperties.Builder builder = NamespaceProperties.newBuilder(); - builder.mergeFrom(message); + RestUtil.mergeFrom(builder, cis); List<NamespaceProperties.Property> properties = builder.getPropsList(); for (NamespaceProperties.Property property : properties) { addProperty(property.getKey(), property.getValue()); diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/NamespacesModel.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/NamespacesModel.java index 9b52010d926..6b1090640a9 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/NamespacesModel.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/NamespacesModel.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.rest.model; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.protobuf.CodedInputStream; import com.google.protobuf.Message; import java.io.IOException; import java.io.Serializable; @@ -30,6 +31,7 @@ import javax.xml.bind.annotation.XmlRootElement; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.client.Admin; import org.apache.hadoop.hbase.rest.ProtobufMessageHandler; +import org.apache.hadoop.hbase.rest.RestUtil; import org.apache.hadoop.hbase.rest.protobuf.generated.NamespacesMessage.Namespaces; import org.apache.yetus.audience.InterfaceAudience; @@ -102,9 +104,9 @@ public class NamespacesModel implements Serializable, ProtobufMessageHandler { } @Override - public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException { + public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException { Namespaces.Builder builder = Namespaces.newBuilder(); - builder.mergeFrom(message); + RestUtil.mergeFrom(builder, cis); namespaces = builder.getNamespaceList(); return this; } diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/RowModel.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/RowModel.java index 7fda0a448a1..f8e2d855ff8 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/RowModel.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/RowModel.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.rest.model; import static org.apache.hadoop.hbase.rest.model.CellModel.MAGIC_LENGTH; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.protobuf.CodedInputStream; import com.google.protobuf.Message; import java.io.IOException; import java.io.Serializable; @@ -186,7 +187,7 @@ public class RowModel implements ProtobufMessageHandler, Serializable { } @Override - public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException { + public ProtobufMessageHandler getObjectFromMessage(CodedInputStream is) throws IOException { // there is no standalone row protobuf message throw new UnsupportedOperationException("no protobuf equivalent to RowModel"); } diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/ScannerModel.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/ScannerModel.java index 9c0fb6170cf..591ac44c1b8 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/ScannerModel.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/ScannerModel.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hbase.rest.model; import com.fasterxml.jackson.annotation.JsonInclude; import com.google.protobuf.ByteString; +import com.google.protobuf.CodedInputStream; import com.google.protobuf.Message; import java.io.IOException; import java.io.Serializable; @@ -904,9 +905,9 @@ public class ScannerModel implements ProtobufMessageHandler, Serializable { } @Override - public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException { + public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException { Scanner.Builder builder = Scanner.newBuilder(); - RestUtil.mergeFrom(builder, message); + RestUtil.mergeFrom(builder, cis); if (builder.hasStartRow()) { startRow = builder.getStartRow().toByteArray(); } diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/StorageClusterStatusModel.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/StorageClusterStatusModel.java index 7c368c77fe2..bac89e14636 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/StorageClusterStatusModel.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/StorageClusterStatusModel.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hbase.rest.model; import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.protobuf.CodedInputStream; import com.google.protobuf.Message; import java.io.IOException; import java.io.Serializable; @@ -691,9 +692,9 @@ public class StorageClusterStatusModel implements Serializable, ProtobufMessageH } @Override - public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException { + public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException { StorageClusterStatus.Builder builder = StorageClusterStatus.newBuilder(); - RestUtil.mergeFrom(builder, message); + RestUtil.mergeFrom(builder, cis); if (builder.hasRegions()) { regions = builder.getRegions(); } diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/TableInfoModel.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/TableInfoModel.java index fabcd9d61e8..9d656c69f34 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/TableInfoModel.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/TableInfoModel.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.rest.model; +import com.google.protobuf.CodedInputStream; import com.google.protobuf.Message; import java.io.IOException; import java.io.Serializable; @@ -138,9 +139,9 @@ public class TableInfoModel implements Serializable, ProtobufMessageHandler { } @Override - public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException { + public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException { TableInfo.Builder builder = TableInfo.newBuilder(); - RestUtil.mergeFrom(builder, message); + RestUtil.mergeFrom(builder, cis); setName(builder.getName()); for (TableInfo.Region region : builder.getRegionsList()) { add( diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/TableListModel.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/TableListModel.java index 6b6d15d8ecd..c81ef365b0b 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/TableListModel.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/TableListModel.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.rest.model; +import com.google.protobuf.CodedInputStream; import com.google.protobuf.Message; import java.io.IOException; import java.io.Serializable; @@ -99,9 +100,9 @@ public class TableListModel implements Serializable, ProtobufMessageHandler { } @Override - public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException { + public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException { TableList.Builder builder = TableList.newBuilder(); - RestUtil.mergeFrom(builder, message); + RestUtil.mergeFrom(builder, cis); for (String table : builder.getNameList()) { this.add(new TableModel(table)); } diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/TableSchemaModel.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/TableSchemaModel.java index f1dcf05d68f..b50cc2bf940 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/TableSchemaModel.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/TableSchemaModel.java @@ -20,6 +20,7 @@ package org.apache.hadoop.hbase.rest.model; import com.fasterxml.jackson.annotation.JsonAnyGetter; import com.fasterxml.jackson.annotation.JsonAnySetter; import com.fasterxml.jackson.annotation.JsonIgnore; +import com.google.protobuf.CodedInputStream; import com.google.protobuf.Message; import java.io.IOException; import java.io.Serializable; @@ -283,9 +284,9 @@ public class TableSchemaModel implements Serializable, ProtobufMessageHandler { } @Override - public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException { + public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException { TableSchema.Builder builder = TableSchema.newBuilder(); - RestUtil.mergeFrom(builder, message); + RestUtil.mergeFrom(builder, cis); this.setName(builder.getName()); for (TableSchema.Attribute attr : builder.getAttrsList()) { this.addAttribute(attr.getName(), attr.getValue()); diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/VersionModel.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/VersionModel.java index b3f6df028f0..6483b42f28f 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/VersionModel.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/model/VersionModel.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.hbase.rest.model; +import com.google.protobuf.CodedInputStream; import com.google.protobuf.Message; import java.io.IOException; import java.io.Serializable; @@ -173,9 +174,9 @@ public class VersionModel implements Serializable, ProtobufMessageHandler { } @Override - public ProtobufMessageHandler getObjectFromMessage(byte[] message) throws IOException { + public ProtobufMessageHandler getObjectFromMessage(CodedInputStream cis) throws IOException { Version.Builder builder = Version.newBuilder(); - RestUtil.mergeFrom(builder, message); + RestUtil.mergeFrom(builder, cis); if (builder.hasRestVersion()) { restVersion = builder.getRestVersion(); } diff --git a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/provider/consumer/ProtobufMessageBodyConsumer.java b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/provider/consumer/ProtobufMessageBodyConsumer.java index 7c3f6f8ea40..340962730e7 100644 --- a/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/provider/consumer/ProtobufMessageBodyConsumer.java +++ b/hbase-rest/src/main/java/org/apache/hadoop/hbase/rest/provider/consumer/ProtobufMessageBodyConsumer.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hbase.rest.provider.consumer; -import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.lang.annotation.Annotation; @@ -59,23 +58,10 @@ public class ProtobufMessageBodyConsumer implements MessageBodyReader<ProtobufMe ProtobufMessageHandler obj = null; try { obj = type.getDeclaredConstructor().newInstance(); - ByteArrayOutputStream baos = new ByteArrayOutputStream(); - byte[] buffer = new byte[4096]; - int read; - do { - read = inputStream.read(buffer, 0, buffer.length); - if (read > 0) { - baos.write(buffer, 0, read); - } - } while (read > 0); - if (LOG.isTraceEnabled()) { - LOG.trace(getClass() + ": read " + baos.size() + " bytes from " + inputStream); - } - obj = obj.getObjectFromMessage(baos.toByteArray()); + return obj.getObjectFromMessage(inputStream); } catch (InstantiationException | NoSuchMethodException | InvocationTargetException | IllegalAccessException e) { throw new WebApplicationException(e); } - return obj; } }