[ 
https://issues.apache.org/jira/browse/FLINK-5530?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15827895#comment-15827895
 ] 

ASF GitHub Bot commented on FLINK-5530:
---------------------------------------

Github user StephanEwen commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3143#discussion_r96613631
  
    --- Diff: 
flink-contrib/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/AbstractRocksDBState.java
 ---
    @@ -132,55 +132,95 @@ public void setCurrentNamespace(N namespace) {
                                namespaceSerializer);
     
                int keyGroup = KeyGroupRangeAssignment.assignToKeyGroup(des.f0, 
backend.getNumberOfKeyGroups());
    -           writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1);
    -           return backend.db.get(columnFamily, 
keySerializationStream.toByteArray());
    +
    +           // we cannot reuse the keySerializationStream member since this 
method
    +           // is called concurrently to the other ones and it may thus 
contain garbage
    +           ByteArrayOutputStreamWithPos tmpKeySerializationStream =
    +                   new ByteArrayOutputStreamWithPos(128);
    +           DataOutputViewStreamWrapper 
tmpKeySerializationDateDataOutputView =
    +                   new 
DataOutputViewStreamWrapper(tmpKeySerializationStream);
    +
    +           writeKeyWithGroupAndNamespace(keyGroup, des.f0, des.f1,
    +                   tmpKeySerializationStream, 
tmpKeySerializationDateDataOutputView);
    +
    +           return backend.db.get(columnFamily, 
tmpKeySerializationStream.toByteArray());
     
        }
     
        protected void writeCurrentKeyWithGroupAndNamespace() throws 
IOException {
    -           
writeKeyWithGroupAndNamespace(backend.getCurrentKeyGroupIndex(), 
backend.getCurrentKey(), currentNamespace);
    +           writeKeyWithGroupAndNamespace(
    +                   backend.getCurrentKeyGroupIndex(),
    +                   backend.getCurrentKey(),
    +                   currentNamespace,
    +                   this.keySerializationStream,
    +                   this.keySerializationDateDataOutputView);
        }
     
    -   protected void writeKeyWithGroupAndNamespace(int keyGroup, K key, N 
namespace) throws IOException {
    +   protected void writeKeyWithGroupAndNamespace(int keyGroup, K key,
    +           N namespace,
    +           final ByteArrayOutputStreamWithPos keySerializationStream,
    +           final DataOutputView keySerializationDateDataOutputView) throws
    +           IOException {
    +
                keySerializationStream.reset();
    -           writeKeyGroup(keyGroup);
    -           writeKey(key);
    -           writeNameSpace(namespace);
    +           writeKeyGroup(keyGroup, keySerializationDateDataOutputView);
    +           writeKey(key, keySerializationStream, 
keySerializationDateDataOutputView);
    +           writeNameSpace(namespace, keySerializationStream, 
keySerializationDateDataOutputView);
        }
     
    -   private void writeKeyGroup(int keyGroup) throws IOException {
    +   private void writeKeyGroup(int keyGroup,
    +           final DataOutputView keySerializationDateDataOutputView) throws
    +           IOException {
    +
                for (int i = backend.getKeyGroupPrefixBytes(); --i >= 0;) {
                        keySerializationDateDataOutputView.writeByte(keyGroup 
>>> (i << 3));
                }
        }
     
    -   private void writeKey(K key) throws IOException {
    +   private void writeKey(K key,
    +           final ByteArrayOutputStreamWithPos keySerializationStream,
    +           final DataOutputView keySerializationDateDataOutputView) throws
    +           IOException {
    +
                //write key
                int beforeWrite = keySerializationStream.getPosition();
                backend.getKeySerializer().serialize(key, 
keySerializationDateDataOutputView);
     
                if (ambiguousKeyPossible) {
                        //write size of key
    -                   writeLengthFrom(beforeWrite);
    +                   writeLengthFrom(beforeWrite, keySerializationStream,
    +                           keySerializationDateDataOutputView);
                }
        }
     
    -   private void writeNameSpace(N namespace) throws IOException {
    +   private void writeNameSpace(N namespace,
    +           final ByteArrayOutputStreamWithPos keySerializationStream,
    +           final DataOutputView keySerializationDateDataOutputView) throws
    +           IOException {
    +
                int beforeWrite = keySerializationStream.getPosition();
                namespaceSerializer.serialize(namespace, 
keySerializationDateDataOutputView);
     
                if (ambiguousKeyPossible) {
                        //write length of namespace
    -                   writeLengthFrom(beforeWrite);
    +                   writeLengthFrom(beforeWrite, keySerializationStream,
    +                           keySerializationDateDataOutputView);
                }
        }
     
    -   private void writeLengthFrom(int fromPosition) throws IOException {
    +   private static void writeLengthFrom(int fromPosition,
    +           final ByteArrayOutputStreamWithPos keySerializationStream,
    +           final DataOutputView keySerializationDateDataOutputView) throws
    +           IOException {
    --- End diff --
    
    I think other part of the Flink code do not break between `throws` and 
exceptions. Would be nice to keep the style similar across the code.


> race condition in AbstractRocksDBState#getSerializedValue
> ---------------------------------------------------------
>
>                 Key: FLINK-5530
>                 URL: https://issues.apache.org/jira/browse/FLINK-5530
>             Project: Flink
>          Issue Type: Bug
>          Components: Queryable State
>    Affects Versions: 1.2.0
>            Reporter: Nico Kruber
>            Assignee: Nico Kruber
>            Priority: Blocker
>
> AbstractRocksDBState#getSerializedValue() uses the same key serialisation 
> stream as the ordinary state access methods but is called in parallel during 
> state queries thus violating the assumption of only one thread accessing it. 
> This may lead to either wrong results in queries or corrupt data while 
> queries are executed.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to