This is an automated email from the ASF dual-hosted git repository. chaokunyang pushed a commit to branch releases-0.10 in repository https://gitbox.apache.org/repos/asf/fury.git
commit 02b883fe81b35a6b0bd0fcd02cb720be4f4394e6 Author: hn <[email protected]> AuthorDate: Fri Jan 24 22:33:57 2025 +0800 feat(java): Chunk by chunk predictive map serialization protocol (#1722) ## What does this PR do? Implement chunk based map serialization in #925. This pr doesn't provide JIT support, it will be implemented in later PR. ## Related issues <!-- Is there any related issue? Please attach here. - #925 --> ## Does this PR introduce any user-facing change? <!-- If any user-facing interface changes, please [open an issue](https://github.com/apache/fury/issues/new/choose) describing the need to do so and update the document if necessary. --> - [ ] Does this PR introduce any public API change? - [ ] Does this PR introduce any binary protocol compatibility change? ## Benchmark <!-- When the PR has an impact on performance (if you don't know whether the PR will have an impact on performance, you can submit the PR first, and if it will have impact on performance, the code reviewer will explain it), be sure to attach a benchmark data here. --> --------- Co-authored-by: hening <[email protected]> Co-authored-by: chaokunyang <[email protected]> --- .../src/main/java/org/apache/fury/Fury.java | 9 + .../collection/AbstractMapSerializer.java | 1571 +++++++++++++++++++- .../fury/serializer/collection/MapFlags.java | 49 + .../fury/serializer/collection/MapSerializer.java | 6 +- .../serializer/collection/MapSerializersTest.java | 83 ++ 5 files changed, 1716 insertions(+), 2 deletions(-) diff --git a/java/fury-core/src/main/java/org/apache/fury/Fury.java b/java/fury-core/src/main/java/org/apache/fury/Fury.java index 51f8183b..94534709 100644 --- a/java/fury-core/src/main/java/org/apache/fury/Fury.java +++ b/java/fury-core/src/main/java/org/apache/fury/Fury.java @@ -919,6 +919,15 @@ public final class Fury implements BaseFury { } } + public Object readNullable(MemoryBuffer buffer, ClassInfoHolder classInfoHolder) { + byte headFlag = buffer.readByte(); + if (headFlag == Fury.NULL_FLAG) { + return null; + } else { + return readNonRef(buffer, classInfoHolder); + } + } + /** Class should be read already. */ public Object readData(MemoryBuffer buffer, ClassInfo classInfo) { depth++; diff --git a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java index 6461e808..51489e19 100644 --- a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java +++ b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java @@ -39,12 +39,16 @@ import org.apache.fury.serializer.Serializer; import org.apache.fury.type.GenericType; import org.apache.fury.type.Generics; import org.apache.fury.type.TypeUtils; +import org.apache.fury.util.Preconditions; /** Serializer for all map-like objects. */ @SuppressWarnings({"unchecked", "rawtypes"}) public abstract class AbstractMapSerializer<T> extends Serializer<T> { + private static final int MAX_CHUNK_SIZE = 127; + private static final byte MARK_HAS_WRITE_CLASS_INFO = -1; protected MethodHandle constructor; protected final boolean supportCodegenHook; + protected boolean useChunkSerialize; private Serializer keySerializer; private Serializer valueSerializer; protected final ClassInfoHolder keyClassInfoWriteCache; @@ -110,7 +114,11 @@ public abstract class AbstractMapSerializer<T> extends Serializer<T> { @Override public void write(MemoryBuffer buffer, T value) { Map map = onMapWrite(buffer, value); - writeElements(fury, buffer, map); + if (useChunkSerialize) { + chunkWriteElements(fury, buffer, map); + } else { + writeElements(fury, buffer, map); + } } @Override @@ -154,6 +162,375 @@ public abstract class AbstractMapSerializer<T> extends Serializer<T> { } } + protected final void chunkWriteElements(Fury fury, MemoryBuffer buffer, Map map) { + Serializer keySerializer = this.keySerializer; + Serializer valueSerializer = this.valueSerializer; + // clear the elemSerializer to avoid conflict if the nested + // serialization has collection field. + // TODO use generics for compatible serializer. + this.keySerializer = null; + this.valueSerializer = null; + if (keySerializer != null && valueSerializer != null) { + javaChunkWriteWithKVSerializers(buffer, map, keySerializer, valueSerializer); + } else if (keySerializer != null) { + javaChunkWriteWithKeySerializers(map, buffer, keySerializer); + } else if (valueSerializer != null) { + javaChunkWriteWithValueSerializers(map, buffer, valueSerializer); + } else { + genericJavaChunkWrite(fury, buffer, map); + } + } + + private void javaChunkWriteWithKeySerializers( + Map map, MemoryBuffer buffer, Serializer keySerializer) { + boolean prevKeyIsNull = false; + int header = 0; + int chunkSize = 0; + int startOffset = -1; + boolean valueIsDifferentType = false; + Class valueClass = null; + boolean reset = false; + for (Object object : map.entrySet()) { + Map.Entry entry = (Map.Entry) object; + Object key = entry.getKey(); + final Object value = entry.getValue(); + if (key == null) { + prevKeyIsNull = true; + } + if (!valueIsDifferentType) { + if (value != null) { + if (valueClass == null) { + valueClass = value.getClass(); + } + valueIsDifferentType = valueClass != value.getClass(); + if (valueIsDifferentType) { + reset = true; + } + } + } + if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) { + writeHeader(buffer, chunkSize, header, startOffset); + prevKeyIsNull = false; + header = 0; + chunkSize = 0; + startOffset = -1; + valueClass = value == null ? null : value.getClass(); + reset = false; + } + startOffset = preserveByte(buffer, startOffset); + boolean trackingKeyRef = keySerializer.needToWriteRef(); + boolean trackingValueRef = fury.trackingRef(); + header = + updateKVHeader( + key, trackingKeyRef, value, trackingValueRef, header, false, valueIsDifferentType); + writeFinalKey(key, buffer, keySerializer, trackingKeyRef); + writeCommonValue( + header, + trackingValueRef, + valueIsDifferentType, + startOffset, + value, + buffer, + fury.getClassResolver(), + fury.getRefResolver()); + chunkSize++; + } + writeHeader(buffer, chunkSize, header, startOffset); + } + + /** + * user preserve 2 bytes to mark whether class info have been written avoid to use a variable to + * mark these 2 bytes will be overwritten when we finish the chunk. + * + * @param buffer buffer to write. + * @param offset offset to mark. + */ + private void markHasWriteClassInfo(MemoryBuffer buffer, int offset) { + int writeIndex = buffer.writerIndex(); + buffer.writerIndex(offset); + buffer.writeByte(MARK_HAS_WRITE_CLASS_INFO); + buffer.writerIndex(writeIndex); + } + + private void writeCommonKey( + boolean trackingKeyRef, + boolean keyIsDifferentType, + int startOffset, + Object key, + MemoryBuffer buffer, + ClassResolver classResolver, + RefResolver refResolver) { + if (!trackingKeyRef) { + if (key == null) { + buffer.writeByte(Fury.NULL_FLAG); + } else { + if (!keyIsDifferentType) { + Serializer keyWriteSerializer = + getKeyWriteSerializer(startOffset, key, buffer, classResolver); + keyWriteSerializer.write(buffer, key); + } else { + fury.writeNonRef( + buffer, key, classResolver.getClassInfo(key.getClass(), keyClassInfoWriteCache)); + } + } + } else { + if (key == null) { + buffer.writeByte(Fury.NULL_FLAG); + } else { + if (!keyIsDifferentType) { + Serializer keyWriteSerializer = + getKeyWriteSerializer(startOffset, key, buffer, classResolver); + writeNoNullRef(keyWriteSerializer, key, buffer, refResolver); + } else { + if (!refResolver.writeNullFlag(buffer, key)) { + fury.writeRef( + buffer, key, classResolver.getClassInfo(key.getClass(), keyClassInfoWriteCache)); + } + } + } + } + } + + private Serializer getKeyWriteSerializer( + int startOffset, Object key, MemoryBuffer buffer, ClassResolver classResolver) { + ClassInfo classInfo = classResolver.getClassInfo(key.getClass(), keyClassInfoWriteCache); + if (buffer.getByte(startOffset) != MARK_HAS_WRITE_CLASS_INFO) { + classResolver.writeClass(buffer, classInfo); + markHasWriteClassInfo(buffer, startOffset); + } + return classInfo.getSerializer(); + } + + private void writeCommonValue( + int header, + boolean trackingValueRef, + boolean valueIsDifferentType, + int startOffset, + Object value, + MemoryBuffer buffer, + ClassResolver classResolver, + RefResolver refResolver) { + if (!trackingValueRef) { + if (value == null) { + buffer.writeByte(Fury.NULL_FLAG); + } else { + if (!valueIsDifferentType) { + if (valueHasNull(header)) { + buffer.writeByte(Fury.NOT_NULL_VALUE_FLAG); + } + Serializer valueWriteSerializer = + getValueWriteSerializer(startOffset, value, buffer, classResolver); + valueWriteSerializer.write(buffer, value); + } else { + fury.writeNullable( + buffer, + value, + classResolver.getClassInfo(value.getClass(), valueClassInfoWriteCache)); + } + } + } else { + if (value == null) { + buffer.writeByte(Fury.NULL_FLAG); + } else { + if (!valueIsDifferentType) { + Serializer valueWriteSerializer = + getValueWriteSerializer(startOffset, value, buffer, classResolver); + if (!valueHasNull(header)) { + writeNoNullRef(valueWriteSerializer, value, buffer, refResolver); + } else { + fury.writeRef(buffer, value, valueWriteSerializer); + } + } else { + if (!refResolver.writeNullFlag(buffer, value)) { + fury.writeRef( + buffer, + value, + classResolver.getClassInfo(value.getClass(), valueClassInfoWriteCache)); + } + } + } + } + } + + private Serializer getValueWriteSerializer( + int startOffset, Object value, MemoryBuffer buffer, ClassResolver classResolver) { + ClassInfo classInfo = classResolver.getClassInfo(value.getClass(), valueClassInfoWriteCache); + if (buffer.getByte(startOffset + 1) != MARK_HAS_WRITE_CLASS_INFO) { + classResolver.writeClass(buffer, classInfo); + markHasWriteClassInfo(buffer, startOffset + 1); + } + return classInfo.getSerializer(); + } + + private void javaChunkWriteWithValueSerializers( + Map map, MemoryBuffer buffer, Serializer valueSerializer) { + boolean prevKeyIsNull = false; + int header = 0; + int chunkSize = 0; + int startOffset = -1; + boolean keyIsDifferentType = false; + Class keyClass = null; + boolean reset = false; + for (Object object : map.entrySet()) { + Map.Entry entry = (Map.Entry) object; + Object key = entry.getKey(); + final Object value = entry.getValue(); + if (key == null) { + prevKeyIsNull = true; + } + if (!keyIsDifferentType) { + if (key != null) { + if (keyClass == null) { + keyClass = key.getClass(); + } + keyIsDifferentType = keyClass != key.getClass(); + if (keyIsDifferentType) { + reset = true; + } + } + } + if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) { + writeHeader(buffer, chunkSize, header, startOffset); + prevKeyIsNull = false; + header = 0; + chunkSize = 0; + startOffset = -1; + keyClass = key == null ? null : key.getClass(); + } + startOffset = preserveByte(buffer, startOffset); + boolean trackingKeyRef = fury.trackingRef(); + boolean trackingValueRef = valueSerializer.needToWriteRef(); + header = + updateKVHeader( + key, trackingKeyRef, value, trackingValueRef, header, keyIsDifferentType, false); + writeCommonKey( + trackingKeyRef, + keyIsDifferentType, + startOffset, + key, + buffer, + fury.getClassResolver(), + fury.getRefResolver()); + writeFinalValue(value, buffer, valueSerializer, trackingValueRef, header); + chunkSize++; + } + writeHeader(buffer, chunkSize, header, startOffset); + } + + private int preserveByte(MemoryBuffer buffer, int startOffset) { + if (startOffset == -1) { + int writerIndex = buffer.writerIndex(); + // preserve two byte for header and chunk size + buffer.writerIndex(writerIndex + 2); + return writerIndex; + } + return startOffset; + } + + private void javaChunkWriteWithKVSerializers( + MemoryBuffer buffer, Map map, Serializer keySerializer, Serializer valueSerializer) { + boolean prevKeyIsNull = false; + int header = 0; + int chunkSize = 0; + int startOffset = -1; + for (Object object : map.entrySet()) { + Map.Entry entry = (Map.Entry) object; + Object key = entry.getKey(); + Object value = entry.getValue(); + if (key == null) { + prevKeyIsNull = true; + } + if (needReset(key, chunkSize, prevKeyIsNull, value, header, false)) { + // update header at the beginning of the chunk when we reset chunk + writeHeader(buffer, chunkSize, header, startOffset); + header = 0; + chunkSize = 0; + startOffset = -1; + prevKeyIsNull = false; + } + startOffset = preserveByte(buffer, startOffset); + boolean trackingKeyRef = keySerializer.needToWriteRef(); + boolean trackingValueRef = valueSerializer.needToWriteRef(); + header = updateKVHeader(key, trackingKeyRef, value, trackingValueRef, header, false, false); + writeFinalKey(key, buffer, keySerializer, trackingKeyRef); + writeFinalValue(value, buffer, valueSerializer, trackingValueRef, header); + chunkSize++; + } + // update header at the beginning of the chunk when we finish the iteration + writeHeader(buffer, chunkSize, header, startOffset); + } + + private void writeFinalKey( + Object key, MemoryBuffer buffer, Serializer keySerializer, boolean trackingKeyRef) { + if (!trackingKeyRef) { + // map key has one null at most, use one chunk to write + if (key == null) { + buffer.writeByte(Fury.NULL_FLAG); + } else { + keySerializer.write(buffer, key); + } + } else { + RefResolver refResolver = fury.getRefResolver(); + if (!refResolver.writeRefOrNull(buffer, key)) { + keySerializer.write(buffer, key); + } + } + } + + private void writeFinalValue( + Object value, + MemoryBuffer buffer, + Serializer valueSerializer, + boolean trackingValueRef, + int header) { + if (!trackingValueRef) { + if (value == null) { + buffer.writeByte(Fury.NULL_FLAG); + } else { + if (valueHasNull(header)) { + buffer.writeByte(Fury.NOT_NULL_VALUE_FLAG); + valueSerializer.write(buffer, value); + } else { + valueSerializer.write(buffer, value); + } + } + } else { + RefResolver refResolver = fury.getRefResolver(); + if (!refResolver.writeRefOrNull(buffer, value)) { + valueSerializer.write(buffer, value); + } + } + } + + private int updateKVHeader( + Object key, + boolean trackingKeyRef, + Object value, + boolean trackingValueRef, + int header, + boolean keyIsDifferentType, + boolean valueIsDifferentType) { + if (trackingKeyRef) { + header |= MapFlags.TRACKING_KEY_REF; + } + if (key == null) { + header |= MapFlags.KEY_HAS_NULL; + } + if (trackingValueRef) { + header |= MapFlags.TRACKING_VALUE_REF; + } + if (value == null) { + header |= MapFlags.VALUE_HAS_NULL; + } + if (keyIsDifferentType) { + header |= MapFlags.KEY_NOT_SAME_TYPE; + } + if (valueIsDifferentType) { + header |= MapFlags.VALUE_NOT_SAME_TYPE; + } + return header; + } + private void javaWriteWithKVSerializers( Fury fury, MemoryBuffer buffer, @@ -216,6 +593,54 @@ public abstract class AbstractMapSerializer<T> extends Serializer<T> { } } + private void genericJavaChunkWrite(Fury fury, MemoryBuffer buffer, Map map) { + Generics generics = fury.getGenerics(); + GenericType genericType = generics.nextGenericType(); + if (genericType == null) { + generalJavaChunkWrite(fury, buffer, map); + } else { + GenericType keyGenericType = genericType.getTypeParameter0(); + GenericType valueGenericType = genericType.getTypeParameter1(); + // type parameters count for `Map field` will be 0; + // type parameters count for `SubMap<V> field` which SubMap is + // `SubMap<V> implements Map<String, V>` will be 1; + if (genericType.getTypeParametersCount() < 2) { + Tuple2<GenericType, GenericType> kvGenericType = getKVGenericType(genericType); + if (keyGenericType == objType && valueGenericType == objType) { + generalJavaChunkWrite(fury, buffer, map); + return; + } + keyGenericType = kvGenericType.f0; + valueGenericType = kvGenericType.f1; + } + // Can't avoid push generics repeatedly in loop by stack depth, because push two + // generic type changed generics stack top, which is depth index, update stack top + // and depth will have some cost too. + // Stack depth to avoid push generics repeatedly in loop. + // Note push two generic type changed generics stack top, which is depth index, + // stack top should be updated when using for serialization k/v. + // int depth = fury.getDepth(); + // // depth + 1 to leave a slot for value generics, otherwise value generics will + // // be overwritten by nested key generics. + // fury.setDepth(depth + 1); + // generics.pushGenericType(keyGenericType); + // fury.setDepth(depth); + // generics.pushGenericType(valueGenericType); + boolean keyGenericTypeFinal = keyGenericType.isMonomorphic(); + boolean valueGenericTypeFinal = valueGenericType.isMonomorphic(); + if (keyGenericTypeFinal && valueGenericTypeFinal) { + javaKVTypesFinalChunkWrite(fury, buffer, map, keyGenericType, valueGenericType, generics); + } else if (keyGenericTypeFinal) { + javaKeyTypeFinalChunkWrite(fury, buffer, map, keyGenericType, valueGenericType, generics); + } else if (valueGenericTypeFinal) { + javaValueTypeFinalChunkWrite(fury, buffer, map, keyGenericType, valueGenericType, generics); + } else { + javaKVTypesNonFinalChunkWrite( + fury, buffer, map, keyGenericType, valueGenericType, generics); + } + } + } + private void javaKVTypesFinalWrite( Fury fury, MemoryBuffer buffer, @@ -236,6 +661,51 @@ public abstract class AbstractMapSerializer<T> extends Serializer<T> { } } + /** + * kv final write do not need to predict , since key and value is almost same type unless null. + */ + private void javaKVTypesFinalChunkWrite( + Fury fury, + MemoryBuffer buffer, + Map map, + GenericType keyGenericType, + GenericType valueGenericType, + Generics generics) { + boolean prevKeyIsNull = false; + int header = 0; + int chunkSize = 0; + int startOffset = -1; + Serializer keySerializer = keyGenericType.getSerializer(fury.getClassResolver()); + Serializer valueSerializer = valueGenericType.getSerializer(fury.getClassResolver()); + for (Object object : map.entrySet()) { + Map.Entry entry = (Map.Entry) object; + Object key = entry.getKey(); + Object value = entry.getValue(); + if (key == null) { + prevKeyIsNull = true; + } + if (needReset(key, chunkSize, prevKeyIsNull, value, header, false)) { + writeHeader(buffer, chunkSize, header, startOffset); + header = 0; + chunkSize = 0; + startOffset = -1; + prevKeyIsNull = false; + } + startOffset = preserveByte(buffer, startOffset); + boolean trackingKeyRef = keySerializer.needToWriteRef(); + boolean trackingValueRef = valueSerializer.needToWriteRef(); + header = updateKVHeader(key, trackingKeyRef, value, trackingValueRef, header, false, false); + generics.pushGenericType(keyGenericType); + writeFinalKey(key, buffer, keySerializer, trackingKeyRef); + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + writeFinalValue(value, buffer, valueSerializer, trackingValueRef, header); + generics.popGenericType(); + chunkSize++; + } + writeHeader(buffer, chunkSize, header, startOffset); + } + private void javaKeyTypeFinalWrite( Fury fury, MemoryBuffer buffer, @@ -265,6 +735,279 @@ public abstract class AbstractMapSerializer<T> extends Serializer<T> { } } + private void javaKeyTypeFinalChunkWrite( + Fury fury, + MemoryBuffer buffer, + Map map, + GenericType keyGenericType, + GenericType valueGenericType, + Generics generics) { + Serializer keySerializer = keyGenericType.getSerializer(fury.getClassResolver()); + boolean trackingValueRef = fury.getClassResolver().needToWriteRef(valueGenericType.getCls()); + boolean prevKeyIsNull = false; + int header = 0; + int chunkSize = 0; + int startOffset = -1; + boolean valueIsDifferentType = false; + Class valueClass = null; + boolean reset = false; + for (Object object : map.entrySet()) { + Map.Entry entry = (Map.Entry) object; + Object key = entry.getKey(); + Object value = entry.getValue(); + if (key == null) { + prevKeyIsNull = true; + } + if (!valueIsDifferentType) { + if (value != null) { + if (valueClass == null) { + valueClass = value.getClass(); + } + valueIsDifferentType = valueClass != value.getClass(); + } + if (valueIsDifferentType) { + reset = true; + } + } + if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) { + writeHeader(buffer, chunkSize, header, startOffset); + prevKeyIsNull = false; + header = 0; + chunkSize = 0; + startOffset = -1; + valueClass = value == null ? null : value.getClass(); + reset = false; + } + startOffset = preserveByte(buffer, startOffset); + generics.pushGenericType(keyGenericType); + boolean trackingKeyRef = keySerializer.needToWriteRef(); + header = + updateKVHeader( + key, trackingKeyRef, value, trackingValueRef, header, false, valueIsDifferentType); + writeFinalKey(key, buffer, keySerializer, trackingKeyRef); + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + writeCommonValue( + header, + trackingValueRef, + valueIsDifferentType, + startOffset, + value, + buffer, + fury.getClassResolver(), + fury.getRefResolver()); + generics.popGenericType(); + chunkSize++; + } + writeHeader(buffer, chunkSize, header, startOffset); + } + + private void javaValueTypeFinalChunkWrite( + Fury fury, + MemoryBuffer buffer, + Map map, + GenericType keyGenericType, + GenericType valueGenericType, + Generics generics) { + int header = 0; + int chunkSize = 0; + boolean prevKeyIsNull = false; + boolean keyIsDifferentType = false; + int startOffset = -1; + Class keyClass = null; + boolean reset = false; + Serializer valueSerializer = valueGenericType.getSerializer(fury.getClassResolver()); + boolean trackingKeyRef = fury.getClassResolver().needToWriteRef(keyGenericType.getCls()); + boolean trackingValueRef = valueSerializer.needToWriteRef(); + for (Object object : map.entrySet()) { + Map.Entry entry = (Map.Entry) object; + Object key = entry.getKey(); + Object value = entry.getValue(); + if (key == null) { + prevKeyIsNull = true; + } + if (!keyIsDifferentType) { + if (key != null) { + if (keyClass == null) { + keyClass = key.getClass(); + } + keyIsDifferentType = keyClass != key.getClass(); + if (keyIsDifferentType) { + reset = true; + } + } + } + if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) { + writeHeader(buffer, chunkSize, header, startOffset); + header = 0; + chunkSize = 0; + prevKeyIsNull = false; + startOffset = -1; + keyClass = key == null ? null : key.getClass(); + reset = false; + } + header = + updateKVHeader( + key, trackingKeyRef, value, trackingValueRef, header, false, keyIsDifferentType); + startOffset = preserveByte(buffer, startOffset); + generics.pushGenericType(keyGenericType); + writeCommonKey( + trackingKeyRef, + keyIsDifferentType, + startOffset, + key, + buffer, + fury.getClassResolver(), + fury.getRefResolver()); + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + writeFinalValue(value, buffer, valueSerializer, trackingValueRef, header); + generics.popGenericType(); + chunkSize++; + } + writeHeader(buffer, chunkSize, header, startOffset); + } + + private void javaKVTypesNonFinalChunkWrite( + Fury fury, + MemoryBuffer buffer, + Map map, + GenericType keyGenericType, + GenericType valueGenericType, + Generics generics) { + ClassResolver classResolver = fury.getClassResolver(); + RefResolver refResolver = fury.getRefResolver(); + int header = 0; + int startOffset = -1; + int chunkSize = 0; + Class<?> keyClass = null; + Class<?> valueClass = null; + boolean keyIsDifferentType = false; + boolean valueIsDifferentType = false; + boolean prevKeyIsNull = false; + boolean markChunkWriteFinish = false; + boolean reset = false; + boolean needMarkFinish = false; + boolean trackingKeyRef = fury.getClassResolver().needToWriteRef(keyGenericType.getCls()); + boolean trackingValueRef = fury.getClassResolver().needToWriteRef(valueGenericType.getCls()); + for (Object object : map.entrySet()) { + Map.Entry entry = (Map.Entry) object; + Object key = entry.getKey(); + Object value = entry.getValue(); + if (!markChunkWriteFinish) { + if (key == null) { + prevKeyIsNull = true; + } + if (!keyIsDifferentType) { + if (key != null) { + if (keyClass == null) { + keyClass = key.getClass(); + } + keyIsDifferentType = keyClass != key.getClass(); + } + if (keyIsDifferentType) { + reset = true; + } + } + if (!valueIsDifferentType) { + if (value != null) { + if (valueClass == null) { + valueClass = value.getClass(); + } + valueIsDifferentType = valueClass != value.getClass(); + } + if (valueIsDifferentType) { + reset = true; + } + } + if (keyIsDifferentType && valueIsDifferentType) { + needMarkFinish = true; + } + if (needMarkFinish) { + writeHeader(buffer, chunkSize, header, startOffset); + // set chunk size = 0 + buffer.writeByte(0); + markChunkWriteFinish = true; + } else { + if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) { + writeHeader(buffer, chunkSize, header, startOffset); + header = 0; + chunkSize = 0; + prevKeyIsNull = false; + keyClass = key == null ? null : key.getClass(); + valueClass = value == null ? null : value.getClass(); + reset = false; + startOffset = -1; + } + } + } + if (markChunkWriteFinish) { + generics.pushGenericType(keyGenericType); + writeJavaRefOptimized( + fury, classResolver, refResolver, trackingKeyRef, buffer, key, keyClassInfoWriteCache); + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + writeJavaRefOptimized( + fury, + classResolver, + refResolver, + trackingValueRef, + buffer, + value, + keyClassInfoWriteCache); + generics.popGenericType(); + } else { + startOffset = preserveByte(buffer, startOffset); + header = + updateKVHeader( + key, + trackingKeyRef, + value, + trackingValueRef, + header, + keyIsDifferentType, + valueIsDifferentType); + generics.pushGenericType(keyGenericType); + writeCommonKey( + trackingKeyRef, + keyIsDifferentType, + startOffset, + key, + buffer, + fury.getClassResolver(), + fury.getRefResolver()); + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + writeCommonValue( + header, + trackingValueRef, + valueIsDifferentType, + startOffset, + value, + buffer, + fury.getClassResolver(), + fury.getRefResolver()); + generics.popGenericType(); + chunkSize++; + } + } + writeHeader(buffer, chunkSize, header, startOffset); + } + + private boolean needReset( + Object key, + int chunkSize, + boolean prevKeyIsNull, + Object value, + int header, + boolean needReset) { + return (key == null && chunkSize > 0) + || (prevKeyIsNull && key != null) + || (value == null && chunkSize > 0 && !valueHasNull(header)) + || (chunkSize >= MAX_CHUNK_SIZE) + || needReset; + } + private void javaValueTypeFinalWrite( Fury fury, MemoryBuffer buffer, @@ -342,6 +1085,130 @@ public abstract class AbstractMapSerializer<T> extends Serializer<T> { } } + protected void generalJavaChunkWrite(Fury fury, MemoryBuffer buffer, Map map) { + int header = 0; + int startOffset = -1; + int chunkSize = 0; + Class<?> keyClass = null; + Class<?> valueClass = null; + boolean keyIsDifferentType = false; + boolean valueIsDifferentType = false; + boolean prevKeyIsNull = false; + boolean markChunkWriteFinish = false; + boolean reset = false; + boolean needMarkFinish = false; + ClassResolver classResolver = fury.getClassResolver(); + RefResolver refResolver = fury.getRefResolver(); + for (Object object : map.entrySet()) { + Map.Entry entry = (Map.Entry) object; + Object key = entry.getKey(); + Object value = entry.getValue(); + if (!markChunkWriteFinish) { + if (key == null) { + prevKeyIsNull = true; + } + if (!keyIsDifferentType) { + if (key != null) { + if (keyClass == null) { + keyClass = key.getClass(); + } + keyIsDifferentType = keyClass != key.getClass(); + } + if (keyIsDifferentType) { + reset = true; + } + } + if (!valueIsDifferentType) { + if (value != null) { + if (valueClass == null) { + valueClass = value.getClass(); + } + valueIsDifferentType = valueClass != value.getClass(); + } + if (valueIsDifferentType) { + reset = true; + } + } + if (valueIsDifferentType && keyIsDifferentType) { + needMarkFinish = true; + } + if (needMarkFinish) { + writeHeader(buffer, chunkSize, header, startOffset); + // set chunk size = 0 + buffer.writeByte(0); + markChunkWriteFinish = true; + } else { + if (needReset(key, chunkSize, prevKeyIsNull, value, header, reset)) { + writeHeader(buffer, chunkSize, header, startOffset); + header = 0; + chunkSize = 0; + startOffset = -1; + prevKeyIsNull = false; + keyClass = key == null ? null : key.getClass(); + valueClass = value == null ? null : value.getClass(); + reset = false; + } + } + } + if (!markChunkWriteFinish) { + startOffset = preserveByte(buffer, startOffset); + boolean trackingRef = fury.trackingRef(); + header = + updateKVHeader( + key, + trackingRef, + value, + trackingRef, + header, + keyIsDifferentType, + valueIsDifferentType); + writeCommonKey( + trackingRef, keyIsDifferentType, startOffset, key, buffer, classResolver, refResolver); + writeCommonValue( + header, + trackingRef, + valueIsDifferentType, + startOffset, + value, + buffer, + classResolver, + refResolver); + chunkSize++; + } else { + writeJavaRefOptimized( + fury, classResolver, refResolver, buffer, entry.getKey(), keyClassInfoWriteCache); + writeJavaRefOptimized( + fury, classResolver, refResolver, buffer, entry.getValue(), valueClassInfoWriteCache); + } + } + writeHeader(buffer, chunkSize, header, startOffset); + } + + private void writeNoNullRef( + Serializer serializer, Object o, MemoryBuffer buffer, RefResolver refResolver) { + if (serializer.needToWriteRef()) { + if (!refResolver.writeRefOrNull(buffer, o)) { + serializer.write(buffer, o); + } + } else { + serializer.write(buffer, o); + } + } + + private boolean valueHasNull(int header) { + return (header & MapFlags.VALUE_HAS_NULL) == MapFlags.VALUE_HAS_NULL; + } + + public void writeHeader(MemoryBuffer memoryBuffer, int chunkSize, int header, int startOffset) { + if (chunkSize > 0) { + int currentWriteIndex = memoryBuffer.writerIndex(); + memoryBuffer.writerIndex(startOffset); + memoryBuffer.writeByte(chunkSize); + memoryBuffer.writeByte(header); + memoryBuffer.writerIndex(currentWriteIndex); + } + } + public static void xwriteElements(Fury fury, MemoryBuffer buffer, Map value) { Generics generics = fury.getGenerics(); GenericType genericType = generics.nextGenericType(); @@ -487,6 +1354,26 @@ public abstract class AbstractMapSerializer<T> extends Serializer<T> { } } + @SuppressWarnings("unchecked") + protected final void chunkReadElements(MemoryBuffer buffer, int size, Map map) { + Serializer keySerializer = this.keySerializer; + Serializer valueSerializer = this.valueSerializer; + // clear the elemSerializer to avoid conflict if the nested + // serialization has collection field. + // TODO use generics for compatible serializer. + this.keySerializer = null; + this.valueSerializer = null; + if (keySerializer != null && valueSerializer != null) { + javaChunkReadWithKVSerializers(buffer, map, size, keySerializer, valueSerializer); + } else if (keySerializer != null) { + javaChunkReadWithKeySerializer(buffer, map, size, keySerializer); + } else if (valueSerializer != null) { + javaChunkReadWithValueSerializer(buffer, map, size, valueSerializer); + } else { + genericJavaChunkRead(fury, buffer, map, size); + } + } + @SuppressWarnings("unchecked") protected final void readElements(MemoryBuffer buffer, int size, Map map) { Serializer keySerializer = this.keySerializer; @@ -551,6 +1438,253 @@ public abstract class AbstractMapSerializer<T> extends Serializer<T> { } } + private void javaChunkReadWithKeySerializer( + MemoryBuffer buffer, Map map, int size, Serializer keySerializer) { + final ClassResolver classResolver = fury.getClassResolver(); + while (size > 0) { + byte chunkSize = buffer.readByte(); + byte header = buffer.readByte(); + Preconditions.checkArgument( + chunkSize >= 0, + "chunkSize < 0, which means serialization protocol is not same with deserialization protocol"); + Serializer valueReadSerializer = null; + for (byte i = 0; i < chunkSize; i++) { + Object key; + Object value; + key = readFinalKey(buffer, header, keySerializer); + if (!fury.trackingRef()) { + if (!valueIsDifferentType(header)) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = valueReadSerializer.read(buffer); + } else { + value = null; + } + } else { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = valueReadSerializer.read(buffer); + } + } else { + value = fury.readNullable(buffer, valueClassInfoReadCache); + } + + } else { + if (!valueIsDifferentType(header)) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = fury.readRef(buffer, valueReadSerializer); + } else { + value = null; + } + } else { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = readNoNullRef(valueReadSerializer, buffer); + } + } else { + value = fury.readRef(buffer, valueClassInfoReadCache); + } + } + map.put(key, value); + size--; + } + } + } + + private void javaChunkReadWithValueSerializer( + MemoryBuffer buffer, Map map, int size, Serializer valueSerializer) { + while (size > 0) { + byte chunkSize = buffer.readByte(); + byte header = buffer.readByte(); + Preconditions.checkArgument( + chunkSize >= 0, + "chunkSize < 0, which means serialization protocol is not same with deserialization protocol"); + Serializer keyReadSerializer = null; + for (byte i = 0; i < chunkSize; i++) { + Object key; + Object value; + if (!fury.trackingRef()) { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected error"); + key = null; + } else { + if (!keyIsDifferentType(header)) { + if (keyReadSerializer == null) { + keyReadSerializer = + fury.getClassResolver() + .readClassInfo(buffer, keyClassInfoReadCache) + .getSerializer(); + } + key = keyReadSerializer.read(buffer); + } else { + key = fury.readNonRef(buffer, keyClassInfoReadCache); + } + } + } else { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected error"); + key = null; + } else { + if (!keyIsDifferentType(header)) { + if (keyReadSerializer == null) { + keyReadSerializer = + fury.getClassResolver() + .readClassInfo(buffer, keyClassInfoReadCache) + .getSerializer(); + } + key = readNoNullRef(keyReadSerializer, buffer); + } else { + key = fury.readRef(buffer, keyClassInfoReadCache); + } + } + } + value = readFinalValue(buffer, header, valueSerializer); + map.put(key, value); + size--; + } + } + } + + private void javaChunkReadWithKVSerializers( + MemoryBuffer buffer, + Map map, + int size, + Serializer keySerializer, + Serializer valueSerializer) { + while (size > 0) { + byte chunkSize = buffer.readByte(); + byte header = buffer.readByte(); + Preconditions.checkArgument( + chunkSize >= 0, + "chunkSize < 0, which means serialization protocol is not same with deserialization protocol"); + for (byte i = 0; i < chunkSize; i++) { + Object key; + Object value; + key = readFinalKey(buffer, header, keySerializer); + value = readFinalValue(buffer, header, valueSerializer); + map.put(key, value); + size--; + } + } + } + + public Object readFinalKey(MemoryBuffer buffer, int header, Serializer keySerializer) { + boolean trackingKeyRef = keySerializer.needToWriteRef(); + if (!trackingKeyRef) { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected NULL_FLAG"); + return null; + } else { + return keySerializer.read(buffer); + } + } else { + return fury.readRef(buffer, keySerializer); + } + } + + public Object readFinalValue(MemoryBuffer buffer, int header, Serializer valueSerializer) { + boolean trackingValueRef = valueSerializer.needToWriteRef(); + if (!trackingValueRef) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + return valueSerializer.read(buffer); + } else { + return null; + } + } else { + return valueSerializer.read(buffer); + } + } else { + return fury.readRef(buffer, valueSerializer); + } + } + + private void genericJavaChunkRead(Fury fury, MemoryBuffer buffer, Map map, int size) { + Generics generics = fury.getGenerics(); + GenericType genericType = generics.nextGenericType(); + if (genericType == null) { + generalJavaChunkRead(fury, buffer, map, size); + } else { + GenericType keyGenericType = genericType.getTypeParameter0(); + GenericType valueGenericType = genericType.getTypeParameter1(); + if (genericType.getTypeParametersCount() < 2) { + Tuple2<GenericType, GenericType> kvGenericType = getKVGenericType(genericType); + if (keyGenericType == objType && valueGenericType == objType) { + generalJavaChunkRead(fury, buffer, map, size); + return; + } + keyGenericType = kvGenericType.f0; + valueGenericType = kvGenericType.f1; + } + boolean keyGenericTypeFinal = keyGenericType.isMonomorphic(); + boolean valueGenericTypeFinal = valueGenericType.isMonomorphic(); + if (keyGenericTypeFinal && valueGenericTypeFinal) { + javaKVTypesFinalChunkRead( + fury, buffer, map, keyGenericType, valueGenericType, generics, size); + } else if (keyGenericTypeFinal) { + javaKeyTypeFinalChunkRead( + fury, buffer, map, keyGenericType, valueGenericType, generics, size); + } else if (valueGenericTypeFinal) { + javaValueTypeFinalChunkRead( + fury, buffer, map, keyGenericType, valueGenericType, generics, size); + } else { + javaKVTypesNonFinalChunkRead( + fury, buffer, map, keyGenericType, valueGenericType, generics, size); + } + generics.popGenericType(); + } + } + + private void javaKVTypesFinalChunkRead( + Fury fury, + MemoryBuffer buffer, + Map map, + GenericType keyGenericType, + GenericType valueGenericType, + Generics generics, + int size) { + Serializer keySerializer = keyGenericType.getSerializer(fury.getClassResolver()); + Serializer valueSerializer = valueGenericType.getSerializer(fury.getClassResolver()); + while (size > 0) { + byte chunkSize = buffer.readByte(); + byte header = buffer.readByte(); + Preconditions.checkArgument( + chunkSize >= 0, + "chunkSize < 0, which means serialization protocol is not same with deserialization protocol"); + for (byte i = 0; i < chunkSize; i++) { + Object key; + Object value; + generics.pushGenericType(keyGenericType); + key = readFinalKey(buffer, header, keySerializer); + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + value = readFinalValue(buffer, header, valueSerializer); + generics.popGenericType(); + map.put(key, value); + size--; + } + } + } + private void javaKVTypesFinalRead( Fury fury, MemoryBuffer buffer, @@ -572,6 +1706,86 @@ public abstract class AbstractMapSerializer<T> extends Serializer<T> { } } + private void javaKeyTypeFinalChunkRead( + Fury fury, + MemoryBuffer buffer, + Map map, + GenericType keyGenericType, + GenericType valueGenericType, + Generics generics, + int size) { + ClassResolver classResolver = fury.getClassResolver(); + boolean trackingValueRef = classResolver.needToWriteRef(valueGenericType.getCls()); + Serializer keySerializer = keyGenericType.getSerializer(fury.getClassResolver()); + while (size > 0) { + byte chunkSize = buffer.readByte(); + Preconditions.checkArgument( + chunkSize >= 0, + "chunkSize < 0, which means serialization protocol is not same with deserialization protocol"); + byte header = buffer.readByte(); + Serializer valueReadSerializer = null; + while (chunkSize > 0) { + generics.pushGenericType(keyGenericType); + Object key = readFinalKey(buffer, header, keySerializer); + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + Object value; + if (!trackingValueRef) { + if (!valueIsDifferentType(header)) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = valueReadSerializer.read(buffer); + } else { + value = null; + } + } else { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = valueReadSerializer.read(buffer); + } + } else { + value = fury.readNullable(buffer, valueClassInfoReadCache); + } + + } else { + if (!valueIsDifferentType(header)) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = fury.readRef(buffer, valueReadSerializer); + } else { + value = null; + } + } else { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = readNoNullRef(valueReadSerializer, buffer); + } + } else { + value = fury.readRef(buffer, valueClassInfoReadCache); + } + } + generics.popGenericType(); + chunkSize--; + size--; + map.put(key, value); + } + } + } + private void javaKeyTypeFinalRead( Fury fury, MemoryBuffer buffer, @@ -596,6 +1810,74 @@ public abstract class AbstractMapSerializer<T> extends Serializer<T> { } } + private void javaValueTypeFinalChunkRead( + Fury fury, + MemoryBuffer buffer, + Map map, + GenericType keyGenericType, + GenericType valueGenericType, + Generics generics, + int size) { + boolean trackingKeyRef = fury.getClassResolver().needToWriteRef(keyGenericType.getCls()); + Serializer valueSerializer = valueGenericType.getSerializer(fury.getClassResolver()); + while (size > 0) { + byte chunkSize = buffer.readByte(); + Preconditions.checkArgument( + chunkSize >= 0, + "chunkSize < 0, which means serialization protocol is not same with deserialization protocol"); + byte header = buffer.readByte(); + Serializer keyReadSerializer = null; + while (chunkSize > 0) { + generics.pushGenericType(keyGenericType); + Object key; + if (!trackingKeyRef) { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected error"); + key = null; + } else { + if (!keyIsDifferentType(header)) { + if (keyReadSerializer == null) { + keyReadSerializer = + fury.getClassResolver() + .readClassInfo(buffer, keyClassInfoReadCache) + .getSerializer(); + } + key = keyReadSerializer.read(buffer); + } else { + key = fury.readNonRef(buffer, keyClassInfoReadCache); + } + } + } else { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected error"); + key = null; + } else { + if (!keyIsDifferentType(header)) { + if (keyReadSerializer == null) { + keyReadSerializer = + fury.getClassResolver() + .readClassInfo(buffer, keyClassInfoReadCache) + .getSerializer(); + } + key = readNoNullRef(keyReadSerializer, buffer); + } else { + key = fury.readRef(buffer, keyClassInfoReadCache); + } + } + } + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + Object value = readFinalValue(buffer, header, valueSerializer); + generics.popGenericType(); + chunkSize--; + size--; + map.put(key, value); + } + } + } + private void javaValueTypeFinalRead( Fury fury, MemoryBuffer buffer, @@ -619,6 +1901,142 @@ public abstract class AbstractMapSerializer<T> extends Serializer<T> { } } + private void javaKVTypesNonFinalChunkRead( + Fury fury, + MemoryBuffer buffer, + Map map, + GenericType keyGenericType, + GenericType valueGenericType, + Generics generics, + int size) { + ClassResolver classResolver = fury.getClassResolver(); + RefResolver refResolver = fury.getRefResolver(); + boolean trackingKeyRef = classResolver.needToWriteRef(keyGenericType.getCls()); + boolean trackingValueRef = classResolver.needToWriteRef(valueGenericType.getCls()); + while (size > 0) { + byte chunkSize = buffer.readByte(); + Preconditions.checkArgument( + chunkSize >= 0, + "chunkSize < 0, which means serialization protocol is not same with deserialization protocol"); + if (chunkSize == 0) { + while (size > 0) { + generics.pushGenericType(keyGenericType); + Object key = + readJavaRefOptimized( + fury, refResolver, trackingKeyRef, buffer, keyClassInfoReadCache); + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + Object value = + readJavaRefOptimized( + fury, refResolver, trackingValueRef, buffer, valueClassInfoReadCache); + generics.popGenericType(); + map.put(key, value); + size--; + } + } else { + byte header = buffer.readByte(); + Serializer keyReadSerializer = null; + Serializer valueReadSerializer = null; + while (chunkSize > 0) { + generics.pushGenericType(keyGenericType); + Object key; + if (!trackingKeyRef) { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected error"); + key = null; + } else { + if (!keyIsDifferentType(header)) { + if (keyReadSerializer == null) { + keyReadSerializer = + classResolver.readClassInfo(buffer, keyClassInfoReadCache).getSerializer(); + } + key = keyReadSerializer.read(buffer); + } else { + key = fury.readNonRef(buffer, keyClassInfoReadCache); + } + } + } else { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected error"); + key = null; + } else { + if (!keyIsDifferentType(header)) { + if (keyReadSerializer == null) { + keyReadSerializer = + classResolver.readClassInfo(buffer, keyClassInfoReadCache).getSerializer(); + } + key = readNoNullRef(keyReadSerializer, buffer); + } else { + key = fury.readRef(buffer, keyClassInfoReadCache); + } + } + } + generics.popGenericType(); + generics.pushGenericType(valueGenericType); + Object value; + if (!trackingValueRef) { + if (!valueIsDifferentType(header)) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver + .readClassInfo(buffer, valueClassInfoReadCache) + .getSerializer(); + } + value = valueReadSerializer.read(buffer); + } else { + value = null; + } + } else { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = valueReadSerializer.read(buffer); + } + } else { + value = fury.readNullable(buffer, valueClassInfoReadCache); + } + + } else { + if (!valueIsDifferentType(header)) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver + .readClassInfo(buffer, valueClassInfoReadCache) + .getSerializer(); + } + value = fury.readRef(buffer, valueReadSerializer); + } else { + value = null; + } + } else { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = readNoNullRef(valueReadSerializer, buffer); + } + } else { + value = fury.readRef(buffer, valueClassInfoReadCache); + } + } + generics.popGenericType(); + chunkSize--; + size--; + map.put(key, value); + } + } + } + } + private void javaKVTypesNonFinalRead( Fury fury, MemoryBuffer buffer, @@ -645,6 +2063,149 @@ public abstract class AbstractMapSerializer<T> extends Serializer<T> { } } + private void generalJavaChunkRead(Fury fury, MemoryBuffer buffer, Map map, int size) { + ClassResolver classResolver = fury.getClassResolver(); + boolean trackingRef = fury.trackingRef(); + while (size > 0) { + byte chunkSize = buffer.readByte(); + Preconditions.checkArgument( + chunkSize >= 0, + "chunkSize < 0, which means serialization protocol is not same with deserialization protocol"); + if (chunkSize == 0) { + while (size > 0) { + Object key = fury.readRef(buffer, keyClassInfoReadCache); + Object value = fury.readRef(buffer, keyClassInfoReadCache); + map.put(key, value); + size--; + } + } else { + byte header = buffer.readByte(); + Serializer keyReadSerializer = null; + Serializer valueReadSerializer = null; + while (chunkSize > 0) { + Object key; + if (!trackingRef) { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected error"); + key = null; + } else { + if (!keyIsDifferentType(header)) { + if (keyReadSerializer == null) { + keyReadSerializer = + classResolver.readClassInfo(buffer, keyClassInfoReadCache).getSerializer(); + } + key = keyReadSerializer.read(buffer); + } else { + key = fury.readNonRef(buffer, keyClassInfoReadCache); + } + } + } else { + if (keyHasNull(header)) { + byte nullFlag = buffer.readByte(); + Preconditions.checkArgument(nullFlag == Fury.NULL_FLAG, "unexpected error"); + key = null; + } else { + if (!keyIsDifferentType(header)) { + if (keyReadSerializer == null) { + keyReadSerializer = + classResolver.readClassInfo(buffer, keyClassInfoReadCache).getSerializer(); + } + key = readNoNullRef(keyReadSerializer, buffer); + } else { + key = fury.readRef(buffer, keyClassInfoReadCache); + } + } + } + Object value; + if (!trackingRef) { + if (!valueIsDifferentType(header)) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver + .readClassInfo(buffer, valueClassInfoReadCache) + .getSerializer(); + } + value = valueReadSerializer.read(buffer); + } else { + value = null; + } + } else { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = valueReadSerializer.read(buffer); + } + } else { + value = fury.readNullable(buffer, valueClassInfoReadCache); + } + + } else { + if (!valueIsDifferentType(header)) { + if (valueHasNull(header)) { + byte flag = buffer.readByte(); + if (flag == Fury.NOT_NULL_VALUE_FLAG) { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver + .readClassInfo(buffer, valueClassInfoReadCache) + .getSerializer(); + } + value = fury.readRef(buffer, valueReadSerializer); + } else { + value = null; + } + } else { + if (valueReadSerializer == null) { + valueReadSerializer = + classResolver.readClassInfo(buffer, valueClassInfoReadCache).getSerializer(); + } + value = readNoNullRef(valueReadSerializer, buffer); + } + } else { + value = fury.readRef(buffer, valueClassInfoReadCache); + } + } + chunkSize--; + size--; + map.put(key, value); + } + } + } + } + + private boolean keyHasNull(int header) { + return (header & MapFlags.KEY_HAS_NULL) == MapFlags.KEY_HAS_NULL; + } + + private boolean keyIsDifferentType(int header) { + return (header & MapFlags.KEY_NOT_SAME_TYPE) == MapFlags.KEY_NOT_SAME_TYPE; + } + + private boolean valueIsDifferentType(int header) { + return (header & MapFlags.VALUE_NOT_SAME_TYPE) == MapFlags.VALUE_NOT_SAME_TYPE; + } + + private Object readNoNullRef(Serializer serializer, MemoryBuffer memoryBuffer) { + if (serializer.needToWriteRef()) { + final RefResolver refResolver = fury.getRefResolver(); + int nextReadRefId = refResolver.tryPreserveRefId(memoryBuffer); + if (nextReadRefId >= Fury.NOT_NULL_VALUE_FLAG) { + Object obj = serializer.read(memoryBuffer); + refResolver.setReadObject(nextReadRefId, obj); + return obj; + } else { + return refResolver.getReadObject(); + } + } else { + return serializer.read(memoryBuffer); + } + } + private void generalJavaRead(Fury fury, MemoryBuffer buffer, Map map, int size) { for (int i = 0; i < size; i++) { Object key = fury.readRef(buffer, keyClassInfoReadCache); @@ -716,6 +2277,14 @@ public abstract class AbstractMapSerializer<T> extends Serializer<T> { return supportCodegenHook; } + public boolean isUseChunkSerialize() { + return useChunkSerialize; + } + + public void setUseChunkSerialize(boolean useChunkSerialize) { + this.useChunkSerialize = useChunkSerialize; + } + /** * Write data except size and elements. * diff --git a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapFlags.java b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapFlags.java new file mode 100644 index 00000000..52ca2d21 --- /dev/null +++ b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapFlags.java @@ -0,0 +1,49 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.fury.serializer.collection; + +public class MapFlags { + + /** Whether track key ref. */ + public static int TRACKING_KEY_REF = 0b1; + + /** Whether key has null. */ + public static int KEY_HAS_NULL = 0b10; + + // /** + // * Whether key is not declare type. + // */ + // public static int KEY_NOT_DECL_TYPE = 0b100; + + /** Whether keys type are different. */ + public static int KEY_NOT_SAME_TYPE = 0b100; + + /** Whether track value ref. */ + public static int TRACKING_VALUE_REF = 0b1000; + + /** Whether value has null. */ + public static int VALUE_HAS_NULL = 0b10000; + + /** Whether value is not declare type. */ + // public static int VALUE_NOT_DECL_TYPE = 0b1000000; + + /** Whether values type are different. */ + public static int VALUE_NOT_SAME_TYPE = 0b100000; +} diff --git a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializer.java b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializer.java index 046f0a7e..ca40de4c 100644 --- a/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializer.java +++ b/java/fury-core/src/main/java/org/apache/fury/serializer/collection/MapSerializer.java @@ -65,7 +65,11 @@ public class MapSerializer<T extends Map> extends AbstractMapSerializer<T> { @Override public T read(MemoryBuffer buffer) { Map map = newMap(buffer); - readElements(buffer, getAndClearNumElements(), map); + if (useChunkSerialize) { + chunkReadElements(buffer, getAndClearNumElements(), map); + } else { + readElements(buffer, getAndClearNumElements(), map); + } return onMapRead(map); } diff --git a/java/fury-core/src/test/java/org/apache/fury/serializer/collection/MapSerializersTest.java b/java/fury-core/src/test/java/org/apache/fury/serializer/collection/MapSerializersTest.java index 74640c68..240fc8a3 100644 --- a/java/fury-core/src/test/java/org/apache/fury/serializer/collection/MapSerializersTest.java +++ b/java/fury-core/src/test/java/org/apache/fury/serializer/collection/MapSerializersTest.java @@ -51,6 +51,7 @@ import org.apache.fury.collection.LazyMap; import org.apache.fury.collection.MapEntry; import org.apache.fury.config.Language; import org.apache.fury.reflect.TypeRef; +import org.apache.fury.serializer.Serializer; import org.apache.fury.serializer.collection.CollectionSerializersTest.TestEnum; import org.apache.fury.test.bean.Cyclic; import org.apache.fury.test.bean.MapFields; @@ -605,4 +606,86 @@ public class MapSerializersTest extends FuryTestBase { map.put("k", 1); serDeCheck(fury, new LazyMapCollectionFieldStruct(ofArrayList(map), map)); } + + @Test(dataProvider = "referenceTrackingConfig") + public void testObjectKeyValueChunkSerializer(boolean referenceTrackingConfig) { + Fury fury = Fury.builder().withRefTracking(referenceTrackingConfig).build(); + final Map<Object, Object> differentKeyAndValueTypeMap = createDifferentKeyAndValueTypeMap(); + final Serializer<? extends Map> serializer = + fury.getSerializer(differentKeyAndValueTypeMap.getClass()); + MapSerializers.HashMapSerializer mapSerializer = (MapSerializers.HashMapSerializer) serializer; + mapSerializer.setUseChunkSerialize(true); + final byte[] serialize = fury.serialize(differentKeyAndValueTypeMap); + final Object deserialize = fury.deserialize(serialize); + assertEquals(deserialize, differentKeyAndValueTypeMap); + } + + @Test(dataProvider = "referenceTrackingConfig") + public void testMapFieldsChunkSerializer(boolean referenceTrackingConfig) { + Fury fury = + Fury.builder() + .withRefTracking(referenceTrackingConfig) + .requireClassRegistration(false) + .build(); + final MapFields mapFieldsObject = createMapFieldsObject(); + // hashmap + final Serializer<HashMap> serializer = fury.getSerializer(HashMap.class); + MapSerializers.HashMapSerializer mapSerializer = (MapSerializers.HashMapSerializer) serializer; + mapSerializer.setUseChunkSerialize(true); + + // LinkedHashMap + final Serializer<LinkedHashMap> serializer1 = fury.getSerializer(LinkedHashMap.class); + MapSerializers.LinkedHashMapSerializer linkedHashMapSerializer = + (MapSerializers.LinkedHashMapSerializer) serializer1; + linkedHashMapSerializer.setUseChunkSerialize(true); + + // TreeMap + final Serializer<TreeMap> serializer2 = fury.getSerializer(TreeMap.class); + MapSerializers.SortedMapSerializer sortedMapSerializer = + (MapSerializers.SortedMapSerializer) serializer2; + sortedMapSerializer.setUseChunkSerialize(true); + + // ConcurrentHashMap + final Serializer<ConcurrentHashMap> serializer3 = fury.getSerializer(ConcurrentHashMap.class); + MapSerializers.ConcurrentHashMapSerializer concurrentHashMapSerializer = + (MapSerializers.ConcurrentHashMapSerializer) serializer3; + concurrentHashMapSerializer.setUseChunkSerialize(true); + + // ConcurrentSkipListMap + final Serializer<ConcurrentSkipListMap> serializer4 = + fury.getSerializer(ConcurrentSkipListMap.class); + MapSerializers.ConcurrentSkipListMapSerializer concurrentSkipListMapSerializer = + (MapSerializers.ConcurrentSkipListMapSerializer) serializer4; + concurrentSkipListMapSerializer.setUseChunkSerialize(true); + + final Serializer<EnumMap> serializer5 = fury.getSerializer(EnumMap.class); + MapSerializers.EnumMapSerializer enumMapSerializer = + (MapSerializers.EnumMapSerializer) serializer5; + enumMapSerializer.setUseChunkSerialize(true); + + final byte[] serialize = fury.serialize(mapFieldsObject); + final Object deserialize = fury.deserialize(serialize); + assertEquals(deserialize, mapFieldsObject); + } + + private static Map<Object, Object> createDifferentKeyAndValueTypeMap() { + Map<Object, Object> map = new HashMap<>(); + map.put(null, "1"); + map.put(2, "1"); + map.put(4, "1"); + map.put(6, "1"); + map.put(7, "1"); + map.put(10, "1"); + map.put(12, "null"); + map.put(19, "null"); + map.put(11, null); + map.put(20, null); + map.put(21, 9); + map.put(22, 99); + map.put(291, 900); + map.put("292", 900); + map.put("293", 900); + map.put("23", 900); + return map; + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
