[
https://issues.apache.org/jira/browse/FLINK-2800?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14978888#comment-14978888
]
ASF GitHub Bot commented on FLINK-2800:
---------------------------------------
GitHub user tillrohrmann opened a pull request:
https://github.com/apache/flink/pull/1308
[FLINK-2800] [kryo] Fix Kryo serialization to clear buffered data
The Kryo serializer uses Kryo's Output class to buffer individual write
operations before
it is written to the underlying output stream. This Output class is flushed
by Flink's
KryoSerializer upon finishing its serialize call. However, in case of an
exception when
flushing the Output, the buffered data is kept in the buffer. Since Flink
uses EOFExceptions
to mark that an underlying buffer is full and has to be spilled, for
example, it can happen
that the record triggering the spilling is written twice after it is
rewritten. The reason
is that Kryo's Output buffer still contains the serialization data of the
failed attempt which
is also flushed to the emptied output stream.
This duplication of records can lead to corrupted data which eventually
let's the Flink program
crash. The problem is solved by clearing Kryo's Output when the flush
operation was not successful.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/tillrohrmann/flink fixKryoSerialization
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/flink/pull/1308.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #1308
----
commit 5618cf7ff65c972dc33c77ba953966224e8c2a1e
Author: Till Rohrmann <[email protected]>
Date: 2015-10-28T17:40:41Z
[FLINK-2800] [kryo] Fix Kryo serialization to clear buffered data
The Kryo serializer uses Kryo's Output class to buffer individual write
operations before
it is written to the underlying output stream. This Output class is flushed
by Flink's
KryoSerializer upon finishing its serialize call. However, in case of an
exception when
flushing the Output, the buffered data is kept in the buffer. Since Flink
uses EOFExceptions
to mark that an underlying buffer is full and has to be spilled, for
example, it can happen
that the record triggering the spilling is written twice after it is
rewritten. The reason
is that Kryo's Output buffer still contains the serialization data of the
failed attempt which
is also flushed to the emptied output stream.
This duplication of records can lead to corrupted data which eventually
let's the Flink program
crash. The problem is solved by clearing Kryo's Output when the flush
operation was not successful.
----
> kryo serialization problem
> --------------------------
>
> Key: FLINK-2800
> URL: https://issues.apache.org/jira/browse/FLINK-2800
> Project: Flink
> Issue Type: Bug
> Components: Type Serialization System
> Affects Versions: 0.10
> Environment: linux ubuntu 12.04 LTS, Java 7
> Reporter: Stefano Bortoli
> Assignee: Till Rohrmann
>
> Performing a cross of two dataset of POJOs I have got the exception below.
> The first time I run the process, there was no problem. When I run it the
> second time, I have got the exception. My guess is that it could be a race
> condition related to the reuse of the Kryo serializer object. However, it
> could also be "a bug where type registrations are not properly forwarded to
> all Serializers", as suggested by Stephan.
> ------------------------------------------------------------------------
> 2015-10-01 18:18:21 INFO JobClient:161 - 10/01/2015 18:18:21 Cross(Cross at
> main(FlinkMongoHadoop2LinkPOI2CDA.java:160))(3/4) switched to FAILED
> com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID:
> 114
> at
> com.esotericsoftware.kryo.util.DefaultClassResolver.readClass(DefaultClassResolver.java:119)
> at com.esotericsoftware.kryo.Kryo.readClass(Kryo.java:641)
> at com.esotericsoftware.kryo.Kryo.readClassAndObject(Kryo.java:752)
> at
> org.apache.flink.api.java.typeutils.runtime.kryo.KryoSerializer.deserialize(KryoSerializer.java:210)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:127)
> at
> org.apache.flink.api.java.typeutils.runtime.TupleSerializer.deserialize(TupleSerializer.java:30)
> at
> org.apache.flink.runtime.operators.resettable.AbstractBlockResettableIterator.getNextRecord(AbstractBlockResettableIterator.java:180)
> at
> org.apache.flink.runtime.operators.resettable.BlockResettableMutableObjectIterator.next(BlockResettableMutableObjectIterator.java:111)
> at
> org.apache.flink.runtime.operators.CrossDriver.runBlockedOuterSecond(CrossDriver.java:309)
> at
> org.apache.flink.runtime.operators.CrossDriver.run(CrossDriver.java:162)
> at
> org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:489)
> at
> org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:354)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:581)
> at java.lang.Thread.run(Thread.java:745)
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)