[ https://issues.apache.org/jira/browse/FLINK-27193?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Akshay Hazari closed FLINK-27193. --------------------------------- Resolution: Fixed > Kyro Serialisation and Deserialisation returns a different object > ------------------------------------------------------------------ > > Key: FLINK-27193 > URL: https://issues.apache.org/jira/browse/FLINK-27193 > Project: Flink > Issue Type: Bug > Reporter: Akshay Hazari > Priority: Minor > > We have a unit test to check if Kyro serialisation and deserialisation > results in the same value but it fails > The KyroSerializer and Deserializer is used like this > {code:java} > import kotlin.reflect.KClass > import org.apache.flink.api.common.ExecutionConfig > import org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer > import org.apache.flink.core.memory.DataInputDeserializer > import org.apache.flink.core.memory.DataOutputSerializer > class KryoSerializerExtension { > fun <T : Any> serde(t: T): T > { val bytes = serialize(t) return deserialize(bytes, t::class) } > fun serialize(any: Any): ByteArray { > val config = ExecutionConfig() > config.registerKryoType(any.javaClass) > val serializer = KryoSerializer(any.javaClass, config) > val output = DataOutputSerializer(1) > serializer.serialize(any, output) > return output.sharedBuffer > } > fun <T : Any> deserialize(bytes: ByteArray, kClass: KClass<T>): T { > val config = ExecutionConfig() > config.registerKryoType(kClass.java) > val serializer = KryoSerializer(kClass.java, config) > val input = DataInputDeserializer(bytes) > return serializer.deserialize(input) > } > } > {code} > > The Unit test simply looks like this > {code:java} > @Test > fun fieldRecord() { > val record = getFieldRecord() > val result = kryo.serde(record) > assertThat(result).isEqualTo(record) > }{code} > This is the actual vs expected assertion error. > The record is huge all the components hash, result in a different value. I am > not sure hot the record is modified. > {code:java} > org.opentest4j.AssertionFailedError: > expected: "FieldRecord(name=foo, type=string, > fieldCounter=FieldCounter(populated=1, missing=3), > countDistinctCalculator=### CPD SKETCH - PREAMBLE: > Flavor : SPARSE > LgK : 10 > Merge Flag : false > Error Const : 0.5887050112577373 > RSE : 0.01839703160180429 > Seed Hash : 93cc | 37836 > Num Coupons : 2 > Num Pairs (SV) : 2 > First Inter Col: 0 > Valid Window : false > Valid PairTable: true > Window Offset : 0 > KxP : 1023.375 > HIP Accum : 2.00012208521548 > ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, > frequencies: {test=1}, numericalRangeHistogramCalculator= > ### Quantiles HeapUpdateDoublesSketch SUMMARY: > Empty : false > Direct, Capacity bytes : false, > Estimation Mode : false > K : 128 > N : 2 > Levels (Needed, Total, Valid): 0, 0, 0 > Level Bit Pattern : 0 > BaseBufferCount : 2 > Combined Buffer Capacity : 4 > Retained Items : 2 > Compact Storage Bytes : 48 > Updatable Storage Bytes : 64 > Normalized Rank Error : 1.406% > Normalized Rank Error (PMF) : 1.711% > Min Value : 1.000000e+00 > Max Value : 3.000000e+00 > ### END SKETCH SUMMARY > , > numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics: > n: 1 > min: 3.0 > max: 3.0 > sum: 3.0 > mean: 3.0 > geometric mean: 3.0000000000000004 > variance: 0.0 > population variance: 0.0 > second moment: 0.0 > sum of squares: 9.0 > standard deviation: 0.0 > sum of logs: 1.0986122886681098 > ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, > mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@3249a1ce)" > but was : "FieldRecord(name=foo, type=string, > fieldCounter=FieldCounter(populated=1, missing=3), > countDistinctCalculator=### CPD SKETCH - PREAMBLE: > Flavor : SPARSE > LgK : 10 > Merge Flag : false > Error Const : 0.5887050112577373 > RSE : 0.01839703160180429 > Seed Hash : 93cc | 37836 > Num Coupons : 2 > Num Pairs (SV) : 2 > First Inter Col: 0 > Valid Window : false > Valid PairTable: true > Window Offset : 0 > KxP : 1023.375 > HIP Accum : 2.00012208521548 > ### END CPC SKETCH, categoryHistogramCalculator=maxNumberOfStoredValues: 20, > frequencies: {test=1}, numericalRangeHistogramCalculator= > ### Quantiles HeapUpdateDoublesSketch SUMMARY: > Empty : false > Direct, Capacity bytes : false, > Estimation Mode : false > K : 128 > N : 2 > Levels (Needed, Total, Valid): 0, 0, 0 > Level Bit Pattern : 0 > BaseBufferCount : 2 > Combined Buffer Capacity : 4 > Retained Items : 2 > Compact Storage Bytes : 48 > Updatable Storage Bytes : 64 > Normalized Rank Error : 1.406% > Normalized Rank Error (PMF) : 1.711% > Min Value : 1.000000e+00 > Max Value : 3.000000e+00 > ### END SKETCH SUMMARY > , > numberOfElementsCalculator=NumberOfElementsCalculator(statsList=[SummaryStatistics: > n: 1 > min: 3.0 > max: 3.0 > sum: 3.0 > mean: 3.0 > geometric mean: 3.0000000000000004 > variance: 0.0 > population variance: 0.0 > second moment: 0.0 > sum of squares: 9.0 > standard deviation: 0.0 > sum of logs: 1.0986122886681098 > ]), decimalDataCalculator=DoubleDistributionForCollections(min=1.0, max=3.0, > mean=2.0, stdDev=1.0, sum=6.0)) (FieldRecord@769a58e5)" > > {code} > > Whether there is any issue with the way we are serialising deserialising this > ? > Any help is appreciated -- This message was sent by Atlassian Jira (v8.20.1#820001)