chaokunyang commented on code in PR #1722:
URL: https://github.com/apache/fury/pull/1722#discussion_r1922641363
##########
java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java:
##########
@@ -140,6 +147,378 @@ protected final void writeElements(Fury fury,
MemoryBuffer buffer, Map map) {
}
}
+ protected final void chunkWriteElements(Fury fury, MemoryBuffer buffer, Map
map) {
+ Serializer keySerializer = this.keySerializer;
+ Serializer valueSerializer = this.valueSerializer;
+ // clear the elemSerializer to avoid conflict if the nested
+ // serialization has collection field.
+ // TODO use generics for compatible serializer.
+ this.keySerializer = null;
+ this.valueSerializer = null;
+ if (keySerializer != null && valueSerializer != null) {
+ javaChunkWriteWithKVSerializers(buffer, map, keySerializer,
valueSerializer);
+ } else if (keySerializer != null) {
+ javaChunkWriteWithKeySerializers(map, buffer, keySerializer);
+ } else if (valueSerializer != null) {
+ javaChunkWriteWithValueSerializers(map, buffer, valueSerializer);
+ } else {
+ genericJavaChunkWrite(fury, buffer, map);
+ }
+ }
+
+ private void javaChunkWriteWithKeySerializers(
+ Map map, MemoryBuffer buffer, Serializer keySerializer) {
+ ClassResolver classResolver = fury.getClassResolver();
+ RefResolver refResolver = fury.getRefResolver();
+ boolean prevKeyIsNull = false;
+ int header = 0;
+ int chunkSize = 0;
+ int startOffset = 0;
+ boolean hasPreservedByte = false;
+ boolean valueIsNotSameType = false;
+ Class valueClass = null;
+ Serializer valueWriteSerializer = null;
+ boolean writeValueClassInfo = false;
+ boolean needReset = false;
+ for (Object object : map.entrySet()) {
+ Map.Entry entry = (Map.Entry) object;
+ Object key = entry.getKey();
+ final Object value = entry.getValue();
+ if (key == null) {
+ prevKeyIsNull = true;
+ }
+ if (!valueIsNotSameType) {
+ if (value != null) {
+ if (valueClass == null) {
+ valueClass = value.getClass();
+ }
+ valueIsNotSameType = valueClass != value.getClass();
+ if (valueIsNotSameType) {
+ needReset = true;
+ }
+ }
+ }
+ if ((key == null && chunkSize > 0)
+ || (prevKeyIsNull && key != null)
+ || (value == null && chunkSize > 0 && !valueHasNull(header))
+ || needReset
+ || (chunkSize >= MAX_CHUNK_SIZE)) {
+ writeHeader(buffer, chunkSize, header, startOffset);
+ prevKeyIsNull = false;
+ header = 0;
+ chunkSize = 0;
+ startOffset = 0;
+ hasPreservedByte = false;
+ valueClass = value == null ? null : value.getClass();
+ valueWriteSerializer = null;
+ writeValueClassInfo = false;
+ needReset = false;
+ }
+ if (!hasPreservedByte) {
+ int writerIndex = buffer.writerIndex();
+ // preserve two byte for header and chunk size
+ buffer.writerIndex(writerIndex + 2);
+ startOffset = writerIndex;
+ hasPreservedByte = true;
+ }
+ // write final key
+ boolean trackingKeyRef = keySerializer.needToWriteRef();
+ boolean trackingValueRef = fury.trackingRef();
+ header =
+ updateKVHeader(
+ key, trackingKeyRef, value, trackingValueRef, header, false,
valueIsNotSameType);
+ writeFinalKey(key, buffer, keySerializer, trackingKeyRef);
+ if (!trackingValueRef) {
+ if (value == null) {
+ buffer.writeByte(Fury.NULL_FLAG);
+ } else {
+ if (!valueIsNotSameType) {
+ if (valueHasNull(header)) {
+ buffer.writeByte(Fury.NOT_NULL_VALUE_FLAG);
+ }
+ ClassInfo classInfo =
classResolver.getClassInfo(value.getClass(), valueClassInfoWriteCache);
+ if (!writeValueClassInfo) {
+ classResolver.writeClass(buffer, classInfo);
+ writeValueClassInfo = true;
+ }
+ if (valueWriteSerializer == null) {
+ valueWriteSerializer = classInfo.getSerializer();
+ }
+ valueWriteSerializer.write(buffer, value);
+ } else {
+ fury.writeNullable(
+ buffer,
+ value,
+ classResolver.getClassInfo(value.getClass(),
valueClassInfoWriteCache));
+ }
+ }
+ } else {
+ if (value == null) {
+ buffer.writeByte(Fury.NULL_FLAG);
+ } else {
+ if (!valueIsNotSameType) {
+ ClassInfo classInfo =
classResolver.getClassInfo(value.getClass(), valueClassInfoWriteCache);
+ if (!writeValueClassInfo) {
+ classResolver.writeClass(buffer, classInfo);
+ writeValueClassInfo = true;
+ }
+ if (valueWriteSerializer == null) {
+ valueWriteSerializer = classInfo.getSerializer();
+ }
+ valueWriteSerializer.write(buffer, value);
+ if (!valueHasNull(header)) {
+ writeNoNullRef(valueWriteSerializer, value, buffer,
refResolver);
+ } else {
+ fury.writeRef(buffer, value, valueWriteSerializer);
+ }
+ } else {
+ if (!refResolver.writeNullFlag(buffer, value)) {
+ fury.writeRef(
+ buffer,
+ value,
+ classResolver.getClassInfo(value.getClass(),
valueClassInfoWriteCache));
+ }
+ }
+ }
+ }
+ chunkSize++;
+ }
+ writeHeader(buffer, chunkSize, header, startOffset);
+ }
+
+ private void javaChunkWriteWithValueSerializers(
+ Map map, MemoryBuffer buffer, Serializer valueSerializer) {
+ ClassResolver classResolver = fury.getClassResolver();
+ RefResolver refResolver = fury.getRefResolver();
+ boolean prevKeyIsNull = false;
+ int header = 0;
+ int chunkSize = 0;
+ int startOffset = 0;
+ boolean hasPreservedByte = false;
+ boolean keyIsNotSameType = false;
+ Class keyClass = null;
+ Serializer keyWriteSerializer = null;
+ boolean writeKeyClassInfo = false;
+ boolean needReset = false;
+ for (Object object : map.entrySet()) {
+ Map.Entry entry = (Map.Entry) object;
+ Object key = entry.getKey();
+ final Object value = entry.getValue();
+ if (key == null) {
+ prevKeyIsNull = true;
+ }
+ if (!keyIsNotSameType) {
+ if (key != null) {
+ if (keyClass == null) {
+ keyClass = key.getClass();
+ }
+ keyIsNotSameType = keyClass != key.getClass();
+ if (keyIsNotSameType) {
+ needReset = true;
+ }
+ }
+ }
+ if ((key == null && chunkSize > 0)
+ || (prevKeyIsNull && key != null)
+ || (value == null && chunkSize > 0 && !valueHasNull(header))
+ || needReset
+ || (chunkSize >= MAX_CHUNK_SIZE)) {
+ writeHeader(buffer, chunkSize, header, startOffset);
+ prevKeyIsNull = false;
+ header = 0;
+ chunkSize = 0;
+ startOffset = 0;
+ hasPreservedByte = false;
+ keyClass = key == null ? null : key.getClass();
+ keyWriteSerializer = null;
+ writeKeyClassInfo = false;
+ }
+ if (!hasPreservedByte) {
+ int writerIndex = buffer.writerIndex();
+ // preserve two byte for header and chunk size
+ buffer.writerIndex(writerIndex + 2);
+ startOffset = writerIndex;
+ hasPreservedByte = true;
+ }
+ boolean trackingKeyRef = fury.trackingRef();
+ boolean trackingValueRef = valueSerializer.needToWriteRef();
+ header =
+ updateKVHeader(
+ key, trackingKeyRef, value, trackingValueRef, header,
keyIsNotSameType, false);
+ if (!trackingKeyRef) {
+ if (key == null) {
+ buffer.writeByte(Fury.NULL_FLAG);
+ } else {
+ if (!keyIsNotSameType) {
+ ClassInfo classInfo =
classResolver.getClassInfo(key.getClass(), keyClassInfoWriteCache);
+ if (!writeKeyClassInfo) {
+ classResolver.writeClass(buffer, classInfo);
+ writeKeyClassInfo = true;
+ }
+ if (keyWriteSerializer == null) {
+ keyWriteSerializer = classInfo.getSerializer();
+ }
+ keyWriteSerializer.write(buffer, key);
+ } else {
+ fury.writeNonRef(
+ buffer, key,
classResolver.getClassInfo(key.getClass(), keyClassInfoWriteCache));
+ }
+ }
+ } else {
+ // todo 提到外面
Review Comment:
please using english commens
##########
java/fury-core/src/main/java/org/apache/fury/serializer/collection/AbstractMapSerializer.java:
##########
@@ -140,6 +147,378 @@ protected final void writeElements(Fury fury,
MemoryBuffer buffer, Map map) {
}
}
+ protected final void chunkWriteElements(Fury fury, MemoryBuffer buffer, Map
map) {
+ Serializer keySerializer = this.keySerializer;
+ Serializer valueSerializer = this.valueSerializer;
+ // clear the elemSerializer to avoid conflict if the nested
+ // serialization has collection field.
+ // TODO use generics for compatible serializer.
+ this.keySerializer = null;
+ this.valueSerializer = null;
+ if (keySerializer != null && valueSerializer != null) {
+ javaChunkWriteWithKVSerializers(buffer, map, keySerializer,
valueSerializer);
+ } else if (keySerializer != null) {
+ javaChunkWriteWithKeySerializers(map, buffer, keySerializer);
+ } else if (valueSerializer != null) {
+ javaChunkWriteWithValueSerializers(map, buffer, valueSerializer);
+ } else {
+ genericJavaChunkWrite(fury, buffer, map);
+ }
+ }
+
+ private void javaChunkWriteWithKeySerializers(
+ Map map, MemoryBuffer buffer, Serializer keySerializer) {
+ ClassResolver classResolver = fury.getClassResolver();
+ RefResolver refResolver = fury.getRefResolver();
+ boolean prevKeyIsNull = false;
+ int header = 0;
+ int chunkSize = 0;
+ int startOffset = 0;
+ boolean hasPreservedByte = false;
+ boolean valueIsNotSameType = false;
+ Class valueClass = null;
+ Serializer valueWriteSerializer = null;
+ boolean writeValueClassInfo = false;
+ boolean needReset = false;
+ for (Object object : map.entrySet()) {
+ Map.Entry entry = (Map.Entry) object;
+ Object key = entry.getKey();
+ final Object value = entry.getValue();
+ if (key == null) {
+ prevKeyIsNull = true;
+ }
+ if (!valueIsNotSameType) {
+ if (value != null) {
+ if (valueClass == null) {
+ valueClass = value.getClass();
+ }
+ valueIsNotSameType = valueClass != value.getClass();
+ if (valueIsNotSameType) {
+ needReset = true;
+ }
+ }
+ }
+ if ((key == null && chunkSize > 0)
+ || (prevKeyIsNull && key != null)
+ || (value == null && chunkSize > 0 && !valueHasNull(header))
+ || needReset
+ || (chunkSize >= MAX_CHUNK_SIZE)) {
+ writeHeader(buffer, chunkSize, header, startOffset);
+ prevKeyIsNull = false;
+ header = 0;
+ chunkSize = 0;
+ startOffset = 0;
+ hasPreservedByte = false;
+ valueClass = value == null ? null : value.getClass();
+ valueWriteSerializer = null;
+ writeValueClassInfo = false;
+ needReset = false;
+ }
+ if (!hasPreservedByte) {
+ int writerIndex = buffer.writerIndex();
+ // preserve two byte for header and chunk size
+ buffer.writerIndex(writerIndex + 2);
+ startOffset = writerIndex;
+ hasPreservedByte = true;
+ }
+ // write final key
+ boolean trackingKeyRef = keySerializer.needToWriteRef();
+ boolean trackingValueRef = fury.trackingRef();
+ header =
+ updateKVHeader(
+ key, trackingKeyRef, value, trackingValueRef, header, false,
valueIsNotSameType);
+ writeFinalKey(key, buffer, keySerializer, trackingKeyRef);
+ if (!trackingValueRef) {
+ if (value == null) {
+ buffer.writeByte(Fury.NULL_FLAG);
+ } else {
+ if (!valueIsNotSameType) {
+ if (valueHasNull(header)) {
+ buffer.writeByte(Fury.NOT_NULL_VALUE_FLAG);
+ }
+ ClassInfo classInfo =
classResolver.getClassInfo(value.getClass(), valueClassInfoWriteCache);
+ if (!writeValueClassInfo) {
+ classResolver.writeClass(buffer, classInfo);
+ writeValueClassInfo = true;
+ }
+ if (valueWriteSerializer == null) {
+ valueWriteSerializer = classInfo.getSerializer();
+ }
+ valueWriteSerializer.write(buffer, value);
+ } else {
+ fury.writeNullable(
+ buffer,
+ value,
+ classResolver.getClassInfo(value.getClass(),
valueClassInfoWriteCache));
+ }
+ }
+ } else {
+ if (value == null) {
+ buffer.writeByte(Fury.NULL_FLAG);
+ } else {
+ if (!valueIsNotSameType) {
+ ClassInfo classInfo =
classResolver.getClassInfo(value.getClass(), valueClassInfoWriteCache);
+ if (!writeValueClassInfo) {
+ classResolver.writeClass(buffer, classInfo);
+ writeValueClassInfo = true;
+ }
+ if (valueWriteSerializer == null) {
+ valueWriteSerializer = classInfo.getSerializer();
+ }
+ valueWriteSerializer.write(buffer, value);
+ if (!valueHasNull(header)) {
+ writeNoNullRef(valueWriteSerializer, value, buffer,
refResolver);
+ } else {
+ fury.writeRef(buffer, value, valueWriteSerializer);
+ }
+ } else {
+ if (!refResolver.writeNullFlag(buffer, value)) {
+ fury.writeRef(
+ buffer,
+ value,
+ classResolver.getClassInfo(value.getClass(),
valueClassInfoWriteCache));
+ }
+ }
+ }
+ }
+ chunkSize++;
+ }
+ writeHeader(buffer, chunkSize, header, startOffset);
+ }
+
+ private void javaChunkWriteWithValueSerializers(
+ Map map, MemoryBuffer buffer, Serializer valueSerializer) {
+ ClassResolver classResolver = fury.getClassResolver();
+ RefResolver refResolver = fury.getRefResolver();
+ boolean prevKeyIsNull = false;
+ int header = 0;
+ int chunkSize = 0;
+ int startOffset = 0;
+ boolean hasPreservedByte = false;
+ boolean keyIsNotSameType = false;
+ Class keyClass = null;
+ Serializer keyWriteSerializer = null;
+ boolean writeKeyClassInfo = false;
+ boolean needReset = false;
+ for (Object object : map.entrySet()) {
+ Map.Entry entry = (Map.Entry) object;
+ Object key = entry.getKey();
+ final Object value = entry.getValue();
+ if (key == null) {
+ prevKeyIsNull = true;
+ }
+ if (!keyIsNotSameType) {
+ if (key != null) {
+ if (keyClass == null) {
+ keyClass = key.getClass();
+ }
+ keyIsNotSameType = keyClass != key.getClass();
+ if (keyIsNotSameType) {
+ needReset = true;
+ }
+ }
+ }
+ if ((key == null && chunkSize > 0)
+ || (prevKeyIsNull && key != null)
+ || (value == null && chunkSize > 0 && !valueHasNull(header))
+ || needReset
+ || (chunkSize >= MAX_CHUNK_SIZE)) {
+ writeHeader(buffer, chunkSize, header, startOffset);
+ prevKeyIsNull = false;
+ header = 0;
+ chunkSize = 0;
+ startOffset = 0;
+ hasPreservedByte = false;
+ keyClass = key == null ? null : key.getClass();
+ keyWriteSerializer = null;
+ writeKeyClassInfo = false;
+ }
+ if (!hasPreservedByte) {
+ int writerIndex = buffer.writerIndex();
+ // preserve two byte for header and chunk size
+ buffer.writerIndex(writerIndex + 2);
+ startOffset = writerIndex;
+ hasPreservedByte = true;
+ }
+ boolean trackingKeyRef = fury.trackingRef();
+ boolean trackingValueRef = valueSerializer.needToWriteRef();
+ header =
+ updateKVHeader(
+ key, trackingKeyRef, value, trackingValueRef, header,
keyIsNotSameType, false);
+ if (!trackingKeyRef) {
+ if (key == null) {
+ buffer.writeByte(Fury.NULL_FLAG);
+ } else {
+ if (!keyIsNotSameType) {
+ ClassInfo classInfo =
classResolver.getClassInfo(key.getClass(), keyClassInfoWriteCache);
+ if (!writeKeyClassInfo) {
+ classResolver.writeClass(buffer, classInfo);
+ writeKeyClassInfo = true;
+ }
+ if (keyWriteSerializer == null) {
+ keyWriteSerializer = classInfo.getSerializer();
+ }
+ keyWriteSerializer.write(buffer, key);
+ } else {
+ fury.writeNonRef(
+ buffer, key,
classResolver.getClassInfo(key.getClass(), keyClassInfoWriteCache));
+ }
+ }
+ } else {
+ // todo 提到外面
Review Comment:
please use English comments
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]