This is an automated email from the ASF dual-hosted git repository. chaokunyang pushed a commit to branch releases-0.10 in repository https://gitbox.apache.org/repos/asf/fury.git
commit f266b689f84d8d52db1b98bc1751b26b1fc8dd4a Author: Shawn Yang <[email protected]> AuthorDate: Sun Jan 26 00:52:22 2025 +0800 feat(java): support streaming encode/decode to/from buffer for row format (#2024) ## What does this PR do? support streaming encode/decode to/from buffer for row format ## Related issues Closes #2019 ## Does this PR introduce any user-facing change? <!-- If any user-facing interface changes, please [open an issue](https://github.com/apache/fury/issues/new/choose) describing the need to do so and update the document if necessary. --> - [ ] Does this PR introduce any public API change? - [ ] Does this PR introduce any binary protocol compatibility change? ## Benchmark <!-- When the PR has an impact on performance (if you don't know whether the PR will have an impact on performance, you can submit the PR first, and if it will have impact on performance, the code reviewer will explain it), be sure to attach a benchmark data here. --> --- .../fury/format/encoder/ArrayEncoderBuilder.java | 3 +- .../format/encoder/BaseBinaryEncoderBuilder.java | 71 ++++------ .../org/apache/fury/format/encoder/Encoder.java | 6 + .../org/apache/fury/format/encoder/Encoders.java | 148 ++++++++++++++++++--- .../fury/format/encoder/MapEncoderBuilder.java | 55 ++++---- .../apache/fury/format/row/binary/BinaryMap.java | 21 ++- .../row/binary/writer/BinaryArrayWriter.java | 6 +- .../format/row/binary/writer/BinaryRowWriter.java | 2 +- .../org/apache/fury/format/type/TypeInference.java | 29 ++-- .../fury/format/encoder/ArrayEncoderTest.java | 14 +- .../fury/format/encoder/CodecBuilderTest.java | 20 +++ .../apache/fury/format/encoder/MapEncoderTest.java | 81 ++++++++++- .../apache/fury/format/encoder/RowEncoderTest.java | 8 +- .../java/org/apache/fury/test/bean/SimpleFoo.java} | 44 ++++-- 14 files changed, 358 insertions(+), 150 deletions(-) diff --git a/java/fury-format/src/main/java/org/apache/fury/format/encoder/ArrayEncoderBuilder.java b/java/fury-format/src/main/java/org/apache/fury/format/encoder/ArrayEncoderBuilder.java index 990beecc..e6cf137c 100644 --- a/java/fury-format/src/main/java/org/apache/fury/format/encoder/ArrayEncoderBuilder.java +++ b/java/fury-format/src/main/java/org/apache/fury/format/encoder/ArrayEncoderBuilder.java @@ -127,7 +127,8 @@ public class ArrayEncoderBuilder extends BaseBinaryEncoderBuilder { expressions.add(array); Expression.Reference fieldExpr = new Expression.Reference(FIELD_NAME, ARROW_FIELD_TYPE, false); - Expression listExpression = serializeForArray(array, arrayWriter, arrayToken, fieldExpr); + Expression listExpression = + serializeForArrayByWriter(array, arrayWriter, arrayToken, fieldExpr); expressions.add(listExpression); diff --git a/java/fury-format/src/main/java/org/apache/fury/format/encoder/BaseBinaryEncoderBuilder.java b/java/fury-format/src/main/java/org/apache/fury/format/encoder/BaseBinaryEncoderBuilder.java index a9b6fc48..4f7f772d 100644 --- a/java/fury-format/src/main/java/org/apache/fury/format/encoder/BaseBinaryEncoderBuilder.java +++ b/java/fury-format/src/main/java/org/apache/fury/format/encoder/BaseBinaryEncoderBuilder.java @@ -19,6 +19,7 @@ package org.apache.fury.format.encoder; +import static org.apache.fury.type.TypeUtils.PRIMITIVE_INT_TYPE; import static org.apache.fury.type.TypeUtils.getRawType; import java.math.BigDecimal; @@ -70,7 +71,6 @@ import org.apache.fury.util.Preconditions; import org.apache.fury.util.StringUtils; /** Base encoder builder for {@link Row}, {@link ArrayData} and {@link MapData}. */ -@SuppressWarnings("UnstableApiUsage") public abstract class BaseBinaryEncoderBuilder extends CodecBuilder { protected static final String REFERENCES_NAME = "references"; protected static final TypeRef<Schema> SCHEMA_TYPE = TypeRef.of(Schema.class); @@ -222,22 +222,14 @@ public abstract class BaseBinaryEncoderBuilder extends CodecBuilder { } } - /** - * Returns an expression to write iterable <code>inputObject</code> of type <code>typeToken</code> - * as {@link BinaryArray} using given <code>writer</code>. - */ protected Expression serializeForArray( Expression inputObject, Expression writer, TypeRef<?> typeRef, Expression arrowField) { - return serializeForArray(inputObject, writer, typeRef, arrowField, false); + Reference arrayWriter = getOrCreateArrayWriter(typeRef, arrowField, writer); + return serializeForArrayByWriter(inputObject, arrayWriter, typeRef, arrowField); } - protected Expression serializeForArray( - Expression inputObject, - Expression writer, - TypeRef<?> typeRef, - Expression arrowField, - boolean reuse) { - Reference arrayWriter = getOrCreateArrayWriter(typeRef, arrowField, writer, reuse); + protected Expression serializeForArrayByWriter( + Expression inputObject, Expression arrayWriter, TypeRef<?> typeRef, Expression arrowField) { StaticInvoke arrayElementField = new StaticInvoke( DataTypes.class, "arrayElementField", "elemField", ARROW_FIELD_TYPE, false, arrowField); @@ -285,21 +277,8 @@ public abstract class BaseBinaryEncoderBuilder extends CodecBuilder { } } - /** - * Get or create an ArrayWriter for given <code>type</code> and use <code>writer</code> as parent - * writer. - */ protected Reference getOrCreateArrayWriter( TypeRef<?> typeRef, Expression arrayDataType, Expression writer) { - return getOrCreateArrayWriter(typeRef, arrayDataType, writer, false); - } - - protected Reference getOrCreateArrayWriter( - TypeRef<?> typeRef, Expression arrayDataType, Expression writer, boolean reuse) { - if (reuse) { - return (Reference) writer; - } - return arrayWriterMap.computeIfAbsent( typeRef, t -> { @@ -344,39 +323,37 @@ public abstract class BaseBinaryEncoderBuilder extends CodecBuilder { TypeRef<?> keySetType = supertype.resolveType(TypeUtils.KEY_SET_RETURN_TYPE); TypeRef<?> valuesType = supertype.resolveType(TypeUtils.VALUES_RETURN_TYPE); + ListExpression expressions = new ListExpression(); + + Invoke offset = new Invoke(writer, "writerIndex", "writerIndex", TypeUtils.PRIMITIVE_INT_TYPE); + // preserve 8 bytes to write the key array numBytes later + Invoke preserve = new Invoke(writer, "writeDirectly", Literal.ofInt(-1)); + expressions.add(offset, preserve); + Invoke keySet = new Invoke(inputObject, "keySet", keySetType); Expression keySerializationExpr = serializeForArray(keySet, writer, keySetType, keyArrayField); + expressions.add(keySet, keySerializationExpr); + + expressions.add( + new Expression.Invoke( + writer, + "writeDirectly", + offset, + Expression.Invoke.inlineInvoke(keySerializationExpr, "size", PRIMITIVE_INT_TYPE))); Invoke values = new Invoke(inputObject, "values", valuesType); Expression valueSerializationExpr = serializeForArray(values, writer, valuesType, valueArrayField); + expressions.add(values, valueSerializationExpr); - Invoke offset = new Invoke(writer, "writerIndex", "writerIndex", TypeUtils.PRIMITIVE_INT_TYPE); - // preserve 8 bytes to write the key array numBytes later - Invoke preserve = - new Invoke(writer, "writeDirectly", new Literal(-1, TypeUtils.PRIMITIVE_INT_TYPE)); - Invoke writeKeyArrayNumBytes = - new Invoke( - writer, - "writeDirectly", - offset, - new Invoke(keySerializationExpr, "size", TypeUtils.PRIMITIVE_INT_TYPE)); Arithmetic size = ExpressionUtils.subtract( - new Invoke(writer, "writerIndex", "writerIndex", TypeUtils.PRIMITIVE_INT_TYPE), offset); + new Invoke(writer, "writerIndex", "writerIndex", PRIMITIVE_INT_TYPE), offset); Invoke setOffsetAndSize = new Invoke(writer, "setOffsetAndSize", ordinal, offset, size); - - ListExpression expression = - new ListExpression( - offset, - preserve, - keySerializationExpr, - writeKeyArrayNumBytes, - valueSerializationExpr, - setOffsetAndSize); + expressions.add(setOffsetAndSize); return new If( - ExpressionUtils.eqNull(inputObject), new Invoke(writer, "setNullAt", ordinal), expression); + ExpressionUtils.eqNull(inputObject), new Invoke(writer, "setNullAt", ordinal), expressions); } /** diff --git a/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoder.java b/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoder.java index 3e0d96c0..76863905 100644 --- a/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoder.java +++ b/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoder.java @@ -19,6 +19,8 @@ package org.apache.fury.format.encoder; +import org.apache.fury.memory.MemoryBuffer; + /** * The encoding interface for encode/decode object to/from binary. The implementation class must * have a constructor with signature {@code Object[] references}, so we can pass any params to @@ -28,7 +30,11 @@ package org.apache.fury.format.encoder; */ public interface Encoder<T> { + T decode(MemoryBuffer buffer); + T decode(byte[] bytes); byte[] encode(T obj); + + void encode(MemoryBuffer buffer, T obj); } diff --git a/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoders.java b/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoders.java index ea2329df..ac375f67 100644 --- a/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoders.java +++ b/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoders.java @@ -19,10 +19,12 @@ package org.apache.fury.format.encoder; +import static org.apache.fury.type.TypeUtils.OBJECT_TYPE; import static org.apache.fury.type.TypeUtils.getRawType; import java.util.Collection; import java.util.HashSet; +import java.util.LinkedHashSet; import java.util.Map; import java.util.Set; import org.apache.arrow.util.Preconditions; @@ -90,6 +92,11 @@ public class Encoders { return encoder.toRow(obj); } + @Override + public T decode(MemoryBuffer buffer) { + return encoder.decode(buffer); + } + @Override public T decode(byte[] bytes) { return encoder.decode(bytes); @@ -99,6 +106,11 @@ public class Encoders { public byte[] encode(T obj) { return encoder.encode(obj); } + + @Override + public void encode(MemoryBuffer buffer, T obj) { + encoder.encode(buffer, obj); + } }; } @@ -150,8 +162,11 @@ public class Encoders { } @Override - public T decode(byte[] bytes) { - MemoryBuffer buffer = MemoryUtils.wrap(bytes); + public T decode(MemoryBuffer buffer) { + return decode(buffer, buffer.readInt32()); + } + + public T decode(MemoryBuffer buffer, int size) { long peerSchemaHash = buffer.readInt64(); if (peerSchemaHash != schemaHash) { throw new ClassNotCompatibleException( @@ -162,10 +177,16 @@ public class Encoders { schema, schemaHash, peerSchemaHash)); } BinaryRow row = new BinaryRow(schema); - row.pointTo(buffer, buffer.readerIndex(), buffer.size()); + row.pointTo(buffer, buffer.readerIndex(), size); + buffer.increaseReaderIndex(size - 8); return fromRow(row); } + @Override + public T decode(byte[] bytes) { + return decode(MemoryUtils.wrap(bytes), bytes.length); + } + @Override public byte[] encode(T obj) { buffer.writerIndex(0); @@ -175,6 +196,21 @@ public class Encoders { BinaryRow row = toRow(obj); return buffer.getBytes(0, 8 + row.getSizeInBytes()); } + + @Override + public void encode(MemoryBuffer buffer, T obj) { + int writerIndex = buffer.writerIndex(); + buffer.writeInt32(-1); + try { + buffer.writeInt64(schemaHash); + writer.setBuffer(buffer); + writer.reset(); + toRow(obj); + buffer.putInt32(writerIndex, buffer.writerIndex() - writerIndex - 4); + } finally { + writer.setBuffer(this.buffer); + } + } }; } catch (Exception e) { String msg = String.format("Create encoder failed, \nbeanClass: %s", beanClass); @@ -192,7 +228,7 @@ public class Encoders { * @return */ public static <T extends Collection> ArrayEncoder<T> arrayEncoder(TypeRef<T> token) { - return arrayEncoder(token, (Fury) null); + return arrayEncoder(token, null); } public static <T extends Collection> ArrayEncoder<T> arrayEncoder(TypeRef<T> token, Fury fury) { @@ -229,6 +265,11 @@ public class Encoders { return encoder.toArray(obj); } + @Override + public T decode(MemoryBuffer buffer) { + return encoder.decode(buffer); + } + @Override public T decode(byte[] bytes) { return encoder.decode(bytes); @@ -238,6 +279,11 @@ public class Encoders { public byte[] encode(T obj) { return encoder.encode(obj); } + + @Override + public void encode(MemoryBuffer buffer, T obj) { + encoder.encode(buffer, obj); + } }; } @@ -299,19 +345,44 @@ public class Encoders { } @Override - public T decode(byte[] bytes) { - MemoryBuffer buffer = MemoryUtils.wrap(bytes); + public T decode(MemoryBuffer buffer) { + return decode(buffer, buffer.readInt32()); + } + + public T decode(MemoryBuffer buffer, int size) { BinaryArray array = new BinaryArray(field); - array.pointTo(buffer, buffer.readerIndex(), buffer.size()); + int readerIndex = buffer.readerIndex(); + array.pointTo(buffer, readerIndex, size); + buffer.readerIndex(readerIndex + size); return fromArray(array); } + @Override + public T decode(byte[] bytes) { + return decode(MemoryUtils.wrap(bytes), bytes.length); + } + @Override public byte[] encode(T obj) { - writer.reset(obj.size()); BinaryArray array = toArray(obj); return writer.getBuffer().getBytes(0, 8 + array.getSizeInBytes()); } + + @Override + public void encode(MemoryBuffer buffer, T obj) { + MemoryBuffer prevBuffer = writer.getBuffer(); + int writerIndex = buffer.writerIndex(); + buffer.writeInt32(-1); + try { + writer.setBuffer(buffer); + BinaryArray array = toArray(obj); + int size = buffer.writerIndex() - writerIndex - 4; + assert size == array.getSizeInBytes(); + buffer.putInt32(writerIndex, size); + } finally { + writer.setBuffer(prevBuffer); + } + } }; } catch (Exception e) { String msg = String.format("Create encoder failed, \nelementType: %s", elementType); @@ -329,7 +400,7 @@ public class Encoders { * @return */ public static <T extends Map> MapEncoder<T> mapEncoder(TypeRef<T> token) { - return mapEncoder(token, (Fury) null); + return mapEncoder(token, null); } /** @@ -348,17 +419,12 @@ public class Encoders { public static <T extends Map> MapEncoder<T> mapEncoder(TypeRef<T> token, Fury fury) { Preconditions.checkNotNull(token); - Tuple2<TypeRef<?>, TypeRef<?>> tuple2 = TypeUtils.getMapKeyValueType(token); Set<TypeRef<?>> set1 = beanSet(tuple2.f0); Set<TypeRef<?>> set2 = beanSet(tuple2.f1); LOG.info("Find beans to load: {}, {}", set1, set2); - if (set1.isEmpty() && set2.isEmpty()) { - throw new IllegalArgumentException("can not find bean class."); - } - TypeRef<?> keyToken = token4BeanLoad(set1, tuple2.f0); TypeRef<?> valToken = token4BeanLoad(set2, tuple2.f1); @@ -388,7 +454,7 @@ public class Encoders { Field keyField = DataTypes.keyArrayFieldForMap(field); Field valField = DataTypes.itemArrayFieldForMap(field); BinaryArrayWriter keyWriter = new BinaryArrayWriter(keyField); - BinaryArrayWriter valWriter = new BinaryArrayWriter(valField); + BinaryArrayWriter valWriter = new BinaryArrayWriter(valField, keyWriter.getBuffer()); try { Class<?> rowCodecClass = loadOrGenMapCodecClass(mapToken, keyToken, valToken); Object references = new Object[] {keyField, valField, keyWriter, valWriter, fury, field}; @@ -421,17 +487,43 @@ public class Encoders { } @Override - public T decode(byte[] bytes) { - MemoryBuffer buffer = MemoryUtils.wrap(bytes); + public T decode(MemoryBuffer buffer) { + return decode(buffer, buffer.readInt32()); + } + + public T decode(MemoryBuffer buffer, int size) { BinaryMap map = new BinaryMap(field); - map.pointTo(buffer, 0, buffer.size()); + int readerIndex = buffer.readerIndex(); + map.pointTo(buffer, readerIndex, size); + buffer.readerIndex(readerIndex + size); return fromMap(map); } + @Override + public T decode(byte[] bytes) { + return decode(MemoryUtils.wrap(bytes), bytes.length); + } + @Override public byte[] encode(T obj) { BinaryMap map = toMap(obj); - return map.getBuf().readBytes(map.getBuf().size()); + return map.getBuf().getBytes(map.getBaseOffset(), map.getSizeInBytes()); + } + + @Override + public void encode(MemoryBuffer buffer, T obj) { + MemoryBuffer prevBuffer = keyWriter.getBuffer(); + int writerIndex = buffer.writerIndex(); + buffer.writeInt32(-1); + try { + keyWriter.setBuffer(buffer); + valWriter.setBuffer(buffer); + toMap(obj); + buffer.putInt32(writerIndex, buffer.writerIndex() - writerIndex - 4); + } finally { + keyWriter.setBuffer(prevBuffer); + valWriter.setBuffer(prevBuffer); + } } }; } catch (Exception e) { @@ -484,6 +576,11 @@ public class Encoders { return encoder.toMap(obj); } + @Override + public T decode(MemoryBuffer buffer) { + return encoder.decode(buffer); + } + @Override public T decode(byte[] bytes) { return encoder.decode(bytes); @@ -493,12 +590,22 @@ public class Encoders { public byte[] encode(T obj) { return encoder.encode(obj); } + + @Override + public void encode(MemoryBuffer buffer, T obj) { + encoder.encode(buffer, obj); + } }; } private static void findBeanToken(TypeRef<?> typeRef, java.util.Set<TypeRef<?>> set) { + Set<TypeRef<?>> visited = new LinkedHashSet<>(); while (TypeUtils.ITERABLE_TYPE.isSupertypeOf(typeRef) || TypeUtils.MAP_TYPE.isSupertypeOf(typeRef)) { + if (visited.contains(typeRef)) { + return; + } + visited.add(typeRef); if (TypeUtils.ITERABLE_TYPE.isSupertypeOf(typeRef)) { typeRef = TypeUtils.getElementType(typeRef); if (TypeUtils.isBean(typeRef)) { @@ -572,7 +679,8 @@ public class Encoders { cls = getRawType(valueToken); beanToken = valueToken; } else { - throw new IllegalArgumentException("not find bean class."); + cls = Object.class; + beanToken = OBJECT_TYPE; } // class name prefix String prefix = TypeInference.inferTypeName(mapCls); diff --git a/java/fury-format/src/main/java/org/apache/fury/format/encoder/MapEncoderBuilder.java b/java/fury-format/src/main/java/org/apache/fury/format/encoder/MapEncoderBuilder.java index cbd4fb52..4690cebc 100644 --- a/java/fury-format/src/main/java/org/apache/fury/format/encoder/MapEncoderBuilder.java +++ b/java/fury-format/src/main/java/org/apache/fury/format/encoder/MapEncoderBuilder.java @@ -20,6 +20,7 @@ package org.apache.fury.format.encoder; import static org.apache.fury.type.TypeUtils.CLASS_TYPE; +import static org.apache.fury.type.TypeUtils.PRIMITIVE_INT_TYPE; import static org.apache.fury.type.TypeUtils.getRawType; import java.util.Map; @@ -40,7 +41,6 @@ import org.apache.fury.type.TypeUtils; import org.apache.fury.util.StringUtils; /** Expression builder for building jit map encoder class. */ -@SuppressWarnings("UnstableApiUsage") public class MapEncoderBuilder extends BaseBinaryEncoderBuilder { private static final Logger LOG = LoggerFactory.getLogger(MapEncoderBuilder.class); private static final String FIELD_NAME = "field"; @@ -168,17 +168,38 @@ public class MapEncoderBuilder extends BaseBinaryEncoderBuilder { Expression.Reference valFieldExpr = new Expression.Reference(VALUE_FIELD_NAME, ARROW_FIELD_TYPE, false); - Expression listExpression = - directlySerializeMap(map, keyArrayWriter, valArrayWriter, keyFieldExpr, valFieldExpr); + @SuppressWarnings("unchecked") + TypeRef<?> supertype = ((TypeRef<? extends Map<?, ?>>) mapToken).getSupertype(Map.class); + TypeRef<?> keySetType = supertype.resolveType(TypeUtils.KEY_SET_RETURN_TYPE); + TypeRef<?> valuesType = supertype.resolveType(TypeUtils.VALUES_RETURN_TYPE); + Expression.Invoke keySet = new Expression.Invoke(map, "keySet", keySetType); + Expression writerIndex = + new Expression.Invoke(keyArrayWriter, "writerIndex", PRIMITIVE_INT_TYPE); + expressions.add(writerIndex); + expressions.add( + new Expression.Invoke(keyArrayWriter, "writeDirectly", Expression.Literal.ofInt(-1))); + Expression keySerializationExpr = + serializeForArrayByWriter(keySet, keyArrayWriter, keySetType, keyFieldExpr); Expression.Invoke keyArray = new Expression.Invoke(keyArrayWriter, "toArray", TypeRef.of(BinaryArray.class)); + expressions.add(map); + expressions.add(keySerializationExpr); + expressions.add(keyArray); + expressions.add( + new Expression.Invoke( + keyArrayWriter, + "writeDirectly", + writerIndex, + Expression.Invoke.inlineInvoke(keyArray, "getSizeInBytes", PRIMITIVE_INT_TYPE))); + + Expression.Invoke values = new Expression.Invoke(map, "values", valuesType); + Expression valueSerializationExpr = + serializeForArrayByWriter(values, valArrayWriter, valuesType, valFieldExpr); Expression.Invoke valArray = new Expression.Invoke(valArrayWriter, "toArray", TypeRef.of(BinaryArray.class)); - expressions.add(map); - expressions.add(listExpression); - expressions.add(keyArray); + expressions.add(valueSerializationExpr); expressions.add(valArray); expressions.add( new Expression.Return( @@ -206,28 +227,6 @@ public class MapEncoderBuilder extends BaseBinaryEncoderBuilder { return expressions; } - private Expression directlySerializeMap( - Expression map, - Expression keyArrayWriter, - Expression valArrayWriter, - Expression keyFieldExpr, - Expression valFieldExpr) { - @SuppressWarnings("unchecked") - TypeRef<?> supertype = ((TypeRef<? extends Map<?, ?>>) mapToken).getSupertype(Map.class); - TypeRef<?> keySetType = supertype.resolveType(TypeUtils.KEY_SET_RETURN_TYPE); - TypeRef<?> valuesType = supertype.resolveType(TypeUtils.VALUES_RETURN_TYPE); - - Expression.Invoke keySet = new Expression.Invoke(map, "keySet", keySetType); - Expression keySerializationExpr = - serializeForArray(keySet, keyArrayWriter, keySetType, keyFieldExpr, true); - - Expression.Invoke values = new Expression.Invoke(map, "values", valuesType); - Expression valueSerializationExpr = - serializeForArray(values, valArrayWriter, valuesType, valFieldExpr, true); - - return new Expression.ListExpression(keySerializationExpr, valueSerializationExpr); - } - private Expression directlyDeserializeMap( Expression map, Expression keyArrayRef, Expression valArrayRef) { @SuppressWarnings("unchecked") diff --git a/java/fury-format/src/main/java/org/apache/fury/format/row/binary/BinaryMap.java b/java/fury-format/src/main/java/org/apache/fury/format/row/binary/BinaryMap.java index d0e9886c..bb41db29 100644 --- a/java/fury-format/src/main/java/org/apache/fury/format/row/binary/BinaryMap.java +++ b/java/fury-format/src/main/java/org/apache/fury/format/row/binary/BinaryMap.java @@ -53,17 +53,10 @@ public class BinaryMap implements MapData { this.keys = keys; this.values = values; this.field = field; - this.baseOffset = 0; + this.buf = keys.getBuffer(); + this.baseOffset = keys.getBaseOffset() - 8; + // memory of keys and values must be continuous. this.sizeInBytes = keys.getSizeInBytes() + values.getSizeInBytes() + 8; - - MemoryBuffer copyBuf = MemoryUtils.buffer(sizeInBytes); - copyBuf.putInt32(0, keys.getSizeInBytes()); - copyBuf.putInt32(4, 0); - keys.getBuffer().copyTo(baseOffset, copyBuf, 8, keys.getSizeInBytes()); - values - .getBuffer() - .copyTo(baseOffset, copyBuf, keys.getSizeInBytes() + 8, values.getSizeInBytes()); - this.buf = copyBuf; } public void pointTo(MemoryBuffer buf, int offset, int sizeInBytes) { @@ -73,12 +66,14 @@ public class BinaryMap implements MapData { // Read the numBytes of key array from the aligned first 8 bytes as int. final int keyArrayBytes = buf.getInt32(offset); assert keyArrayBytes >= 0 : "keyArrayBytes (" + keyArrayBytes + ") should >= 0"; + keys.pointTo(buf, offset + 8, keyArrayBytes); final int valueArrayBytes = sizeInBytes - keyArrayBytes - 8; assert valueArrayBytes >= 0 : "valueArraySize (" + valueArrayBytes + ") should >= 0"; - - keys.pointTo(buf, offset + 8, keyArrayBytes); values.pointTo(buf, offset + 8 + keyArrayBytes, valueArrayBytes); - assert keys.numElements() == values.numElements(); + if (keys.numElements() != values.numElements()) { + throw new UnsupportedOperationException(); + } + // assert keys.numElements() == values.numElements(); } public MemoryBuffer getBuf() { diff --git a/java/fury-format/src/main/java/org/apache/fury/format/row/binary/writer/BinaryArrayWriter.java b/java/fury-format/src/main/java/org/apache/fury/format/row/binary/writer/BinaryArrayWriter.java index 7cd46032..ea07fdc4 100644 --- a/java/fury-format/src/main/java/org/apache/fury/format/row/binary/writer/BinaryArrayWriter.java +++ b/java/fury-format/src/main/java/org/apache/fury/format/row/binary/writer/BinaryArrayWriter.java @@ -72,7 +72,7 @@ public class BinaryArrayWriter extends BinaryWriter { // there's no need to set `super.startIndex = writer.writerIndex();` } - private BinaryArrayWriter(Field field, MemoryBuffer buffer) { + public BinaryArrayWriter(Field field, MemoryBuffer buffer) { super(buffer, 8); this.field = field; int width = DataTypes.getTypeWidth(field.getChildren().get(0).getType()); @@ -208,9 +208,7 @@ public class BinaryArrayWriter extends BinaryWriter { public BinaryArray toArray() { BinaryArray array = new BinaryArray(field); int size = size(); - MemoryBuffer buffer = MemoryUtils.buffer(size); - this.buffer.copyTo(startIndex, buffer, 0, size); - array.pointTo(buffer, 0, size); + array.pointTo(buffer, startIndex, size); return array; } diff --git a/java/fury-format/src/main/java/org/apache/fury/format/row/binary/writer/BinaryRowWriter.java b/java/fury-format/src/main/java/org/apache/fury/format/row/binary/writer/BinaryRowWriter.java index 04bcd336..d641649f 100644 --- a/java/fury-format/src/main/java/org/apache/fury/format/row/binary/writer/BinaryRowWriter.java +++ b/java/fury-format/src/main/java/org/apache/fury/format/row/binary/writer/BinaryRowWriter.java @@ -140,7 +140,7 @@ public class BinaryRowWriter extends BinaryWriter { int size = size(); MemoryBuffer buffer = MemoryUtils.buffer(size); this.buffer.copyTo(startIndex, buffer, 0, size); - row.pointTo(buffer, 0, size); + row.pointTo(buffer, startIndex, size); return row; } } diff --git a/java/fury-format/src/main/java/org/apache/fury/format/type/TypeInference.java b/java/fury-format/src/main/java/org/apache/fury/format/type/TypeInference.java index c4e2e104..52491f21 100644 --- a/java/fury-format/src/main/java/org/apache/fury/format/type/TypeInference.java +++ b/java/fury-format/src/main/java/org/apache/fury/format/type/TypeInference.java @@ -45,7 +45,6 @@ import org.apache.fury.util.Preconditions; import org.apache.fury.util.StringUtils; /** Arrow related type inference. */ -@SuppressWarnings("UnstableApiUsage") public class TypeInference { public static Schema inferSchema(java.lang.reflect.Type type) { @@ -240,24 +239,16 @@ public class TypeInference { public static String inferTypeName(TypeRef<?> token) { StringBuilder sb = new StringBuilder(); - TypeRef<?> arrayToken = token; - while (TypeUtils.ITERABLE_TYPE.isSupertypeOf(arrayToken) - || TypeUtils.MAP_TYPE.isSupertypeOf(arrayToken)) { - if (TypeUtils.ITERABLE_TYPE.isSupertypeOf(arrayToken)) { - sb.append(getRawType(arrayToken).getSimpleName()); - arrayToken = TypeUtils.getElementType(arrayToken); - } else { - Tuple2<TypeRef<?>, TypeRef<?>> tuple2 = TypeUtils.getMapKeyValueType(arrayToken); - sb.append("Map"); - - if (!TypeUtils.isBean(tuple2.f0)) { - arrayToken = tuple2.f0; - } - - if (!TypeUtils.isBean(tuple2.f1)) { - arrayToken = tuple2.f1; - } - } + if (TypeUtils.ITERABLE_TYPE.isSupertypeOf(token)) { + sb.append("Array_"); + sb.append(inferTypeName(TypeUtils.getElementType(token))); + } else if (TypeUtils.MAP_TYPE.isSupertypeOf(token)) { + sb.append("Map_"); + Tuple2<TypeRef<?>, TypeRef<?>> mapKeyValueType = TypeUtils.getMapKeyValueType(token); + sb.append(inferTypeName(mapKeyValueType.f0)); + sb.append("_").append(inferTypeName(mapKeyValueType.f1)); + } else { + sb.append(token.getRawType().getSimpleName()); } return sb.toString(); } diff --git a/java/fury-format/src/test/java/org/apache/fury/format/encoder/ArrayEncoderTest.java b/java/fury-format/src/test/java/org/apache/fury/format/encoder/ArrayEncoderTest.java index 0af1891a..b10306df 100644 --- a/java/fury-format/src/test/java/org/apache/fury/format/encoder/ArrayEncoderTest.java +++ b/java/fury-format/src/test/java/org/apache/fury/format/encoder/ArrayEncoderTest.java @@ -19,6 +19,8 @@ package org.apache.fury.format.encoder; +import static org.apache.fury.format.encoder.CodecBuilderTest.testStreamingEncode; + import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -51,8 +53,10 @@ public class ArrayEncoderTest { byte[] bs = encoder.encode(bars); List<RowEncoderTest.Bar> bbars = encoder.decode(bs); - Assert.assertEquals(bs.length, 280); + Assert.assertEquals(bs.length, 224); Assert.assertEquals(bars, bbars); + + testStreamingEncode(encoder, bars); } @Test @@ -83,8 +87,10 @@ public class ArrayEncoderTest { byte[] bs = encoder.encode(bars); List<List<List<RowEncoderTest.Bar>>> bbars = encoder.decode(bs); - Assert.assertEquals(bs.length, 1632); + Assert.assertEquals(bs.length, 1576); Assert.assertEquals(bars, bbars); + + testStreamingEncode(encoder, bars); } @Test @@ -115,7 +121,9 @@ public class ArrayEncoderTest { byte[] bs = encoder.encode(lmap); List<List<Map<RowEncoderTest.Foo, List<RowEncoderTest.Bar>>>> blmap = encoder.decode(bs); - Assert.assertEquals(bs.length, 10920); + Assert.assertEquals(bs.length, 10824); Assert.assertEquals(lmap, blmap); + + testStreamingEncode(encoder, lmap); } } diff --git a/java/fury-format/src/test/java/org/apache/fury/format/encoder/CodecBuilderTest.java b/java/fury-format/src/test/java/org/apache/fury/format/encoder/CodecBuilderTest.java index 648009f7..4e6530ce 100644 --- a/java/fury-format/src/test/java/org/apache/fury/format/encoder/CodecBuilderTest.java +++ b/java/fury-format/src/test/java/org/apache/fury/format/encoder/CodecBuilderTest.java @@ -20,8 +20,10 @@ package org.apache.fury.format.encoder; import static org.testng.Assert.assertTrue; +import static org.testng.AssertJUnit.assertEquals; import java.util.concurrent.atomic.AtomicLong; +import org.apache.fury.memory.MemoryBuffer; import org.apache.fury.test.bean.BeanA; import org.apache.fury.test.bean.BeanB; import org.apache.fury.test.bean.Foo; @@ -45,4 +47,22 @@ public class CodecBuilderTest { GeneratedRowEncoder.class.isAssignableFrom( Encoders.loadOrGenRowCodecClass(AtomicLong.class))); } + + static void testStreamingEncode(Encoder encoder, Object object) { + MemoryBuffer buffer = MemoryBuffer.newHeapBuffer(32); + for (int i = 0; i < 1; i++) { + buffer.writerIndex(0); + buffer.readerIndex(0); + for (int j = 0; j <= i; j++) { + buffer.writerIndex(0); + buffer.readerIndex(0); + buffer.writeByte(-1); + buffer.readByte(); + encoder.encode(buffer, object); + encoder.encode(buffer, object); + assertEquals(object, encoder.decode(buffer)); + assertEquals(object, encoder.decode(buffer)); + } + } + } } diff --git a/java/fury-format/src/test/java/org/apache/fury/format/encoder/MapEncoderTest.java b/java/fury-format/src/test/java/org/apache/fury/format/encoder/MapEncoderTest.java index 53ff55c4..4b305d39 100644 --- a/java/fury-format/src/test/java/org/apache/fury/format/encoder/MapEncoderTest.java +++ b/java/fury-format/src/test/java/org/apache/fury/format/encoder/MapEncoderTest.java @@ -19,6 +19,10 @@ package org.apache.fury.format.encoder; +import static org.apache.fury.format.encoder.CodecBuilderTest.testStreamingEncode; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import java.util.ArrayList; import java.util.Arrays; import java.util.HashMap; @@ -26,6 +30,8 @@ import java.util.List; import java.util.Map; import org.apache.fury.format.row.binary.BinaryMap; import org.apache.fury.reflect.TypeRef; +import org.apache.fury.test.bean.Foo; +import org.apache.fury.test.bean.SimpleFoo; import org.testng.Assert; import org.testng.annotations.Test; @@ -45,7 +51,6 @@ public class MapEncoderTest { Encoders.mapEncoder(bars.getClass(), String.class, RowEncoderTest.Bar.class); BinaryMap array = encoder.toMap(bars); Map<String, RowEncoderTest.Bar> newBars = encoder.fromMap(array); - Assert.assertEquals(bars, newBars); byte[] bytes = encoder.encode(bars); @@ -83,6 +88,78 @@ public class MapEncoderTest { Assert.assertEquals(decodeMap.size(), 5); } + @Test + public void testSimpleNestArrayWithMapEncoder1() { + Map<String, List<Integer>> map = new HashMap<>(); + map.put("k1", ImmutableList.of(1, 2)); + + MapEncoder<Map<String, List<Integer>>> encoder = + Encoders.mapEncoder(new TypeRef<Map<String, List<Integer>>>() {}); + + testStreamingEncode(encoder, map); + } + + @Test + public void testSimpleNestArrayWithMapEncoder2() { + Map<String, List<List<Integer>>> map = new HashMap<>(); + map.put("k1", ImmutableList.of(ImmutableList.of(1, 2), ImmutableList.of(1, 2))); + + MapEncoder<Map<String, List<List<Integer>>>> encoder = + Encoders.mapEncoder(new TypeRef<Map<String, List<List<Integer>>>>() {}); + + testStreamingEncode(encoder, map); + } + + @Test + public void testSimpleStructWithMapEncoder2() { + Map<String, Foo> map = new HashMap<>(); + map.put("k1", Foo.create()); + + MapEncoder<Map<String, Foo>> encoder = Encoders.mapEncoder(new TypeRef<Map<String, Foo>>() {}); + + testStreamingEncode(encoder, map); + } + + @Test + public void testSimpleNestStructWithMapEncoder() { + Map<String, List<Foo>> map = new HashMap<>(); + map.put("k1", ImmutableList.of(Foo.create())); + + MapEncoder<Map<String, List<Foo>>> encoder = + Encoders.mapEncoder(new TypeRef<Map<String, List<Foo>>>() {}); + + testStreamingEncode(encoder, map); + } + + @Test + public void testKVStructMap() { + Map<SimpleFoo, SimpleFoo> map = ImmutableMap.of(SimpleFoo.create(), SimpleFoo.create()); + MapEncoder encoder = Encoders.mapEncoder(new TypeRef<Map<SimpleFoo, SimpleFoo>>() {}); + testStreamingEncode(encoder, map); + MapEncoder encoder1 = Encoders.mapEncoder(new TypeRef<Map<Foo, Foo>>() {}); + testStreamingEncode(encoder1, ImmutableMap.of(Foo.create(), Foo.create())); + } + + @Test + public void testSimpleNestKVStructMapArray() { + ArrayEncoder<List<Map<SimpleFoo, SimpleFoo>>> encoder = + Encoders.arrayEncoder(new TypeRef<List<Map<SimpleFoo, SimpleFoo>>>() {}); + + testStreamingEncode( + encoder, ImmutableList.of(ImmutableMap.of(SimpleFoo.create(), SimpleFoo.create()))); + } + + @Test + public void testSimpleNestKVStruct() { + Map<String, List<Map<Foo, Foo>>> map = new HashMap<>(); + map.put("k1", ImmutableList.of(ImmutableMap.of(Foo.create(), Foo.create()))); + + MapEncoder<Map<String, List<Map<Foo, Foo>>>> encoder = + Encoders.mapEncoder(new TypeRef<Map<String, List<Map<Foo, Foo>>>>() {}); + + testStreamingEncode(encoder, map); + } + @Test public void testNestArrayWithMapEncoder() { Map<String, List<Map<RowEncoderTest.Foo, List<RowEncoderTest.Bar>>>> lmap = new HashMap<>(); @@ -112,5 +189,7 @@ public class MapEncoderTest { Map<String, List<Map<RowEncoderTest.Foo, List<RowEncoderTest.Bar>>>> decodeMap = encoder.decode(bytes); Assert.assertEquals(decodeMap.size(), 10); + + testStreamingEncode(encoder, lmap); } } diff --git a/java/fury-format/src/test/java/org/apache/fury/format/encoder/RowEncoderTest.java b/java/fury-format/src/test/java/org/apache/fury/format/encoder/RowEncoderTest.java index 973804d5..9a16954b 100644 --- a/java/fury-format/src/test/java/org/apache/fury/format/encoder/RowEncoderTest.java +++ b/java/fury-format/src/test/java/org/apache/fury/format/encoder/RowEncoderTest.java @@ -19,6 +19,8 @@ package org.apache.fury.format.encoder; +import static org.apache.fury.format.encoder.CodecBuilderTest.testStreamingEncode; + import com.google.common.collect.ImmutableMap; import java.util.Arrays; import java.util.HashMap; @@ -41,21 +43,23 @@ public class RowEncoderTest { Encoders.bean(AtomicLong.class); { RowEncoder<BeanA> encoder = Encoders.bean(BeanA.class); + BeanA beanA = BeanA.createBeanA(2); for (int i = 0; i < 3; i++) { - BeanA beanA = BeanA.createBeanA(2); BinaryRow row = encoder.toRow(beanA); BeanA newBean = encoder.fromRow(row); Assert.assertEquals(beanA, newBean); } + testStreamingEncode(encoder, beanA); } { RowEncoder<BeanB> encoder = Encoders.bean(BeanB.class); + BeanB beanB = BeanB.createBeanB(2); for (int i = 0; i < 3; i++) { - BeanB beanB = BeanB.createBeanB(2); BinaryRow row = encoder.toRow(beanB); BeanB newBean = encoder.fromRow(row); Assert.assertEquals(beanB, newBean); } + testStreamingEncode(encoder, beanB); } } diff --git a/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoder.java b/java/fury-test-core/src/main/java/org/apache/fury/test/bean/SimpleFoo.java similarity index 53% copy from java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoder.java copy to java/fury-test-core/src/main/java/org/apache/fury/test/bean/SimpleFoo.java index 3e0d96c0..5e760751 100644 --- a/java/fury-format/src/main/java/org/apache/fury/format/encoder/Encoder.java +++ b/java/fury-test-core/src/main/java/org/apache/fury/test/bean/SimpleFoo.java @@ -17,18 +17,40 @@ * under the License. */ -package org.apache.fury.format.encoder; +package org.apache.fury.test.bean; -/** - * The encoding interface for encode/decode object to/from binary. The implementation class must - * have a constructor with signature {@code Object[] references}, so we can pass any params to - * codec. - * - * @param <T> type of value - */ -public interface Encoder<T> { +import java.util.Objects; + +public class SimpleFoo { + public int f1; + public String f2; + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + SimpleFoo foo = (SimpleFoo) o; + return f1 == foo.f1 && Objects.equals(f2, foo.f2); + } + + @Override + public int hashCode() { + return Objects.hash(f1, f2); + } - T decode(byte[] bytes); + @Override + public String toString() { + return "SimpleFoo{" + "f1=" + f1 + ", f2='" + f2 + '\'' + '}'; + } - byte[] encode(T obj); + public static SimpleFoo create() { + SimpleFoo foo = new SimpleFoo(); + foo.f1 = 10; + foo.f2 = "str"; + return foo; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
