Hai Zhou created FLINK-9233:
-------------------------------

             Summary: Merging state may cause runtime exception when windows  
trigger onMerge
                 Key: FLINK-9233
                 URL: https://issues.apache.org/jira/browse/FLINK-9233
             Project: Flink
          Issue Type: Bug
          Components: State Backends, Checkpointing
    Affects Versions: 1.4.0
            Reporter: Hai Zhou


the main logic of my flink job is as follows:
{code:java}
clickStream.coGroup(exposureStream).where(...).equalTo(...)
.window(EventTimeSessionWindows.withGap())
.trigger(new SessionMatchTrigger)
.evictor()
.apply();
{code}
{code:java}
SessionMatchTrigger{

    ReducingStateDescriptor  stateDesc = new ReducingStateDescriptor()
...
    public boolean canMerge() {
        return true;
    }


    public void onMerge(TimeWindow window, OnMergeContext ctx) {
        ctx.mergePartitionedState(this.stateDesc);
        ctx.registerEventTimeTimer(window.maxTimestamp());
    }
....
}
{code}
{panel:title=detailed error logs}
java.lang.RuntimeException: Error while merging state.
 at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.mergePartitionedState(WindowOperator.java:895)
 at com.package.trigger.SessionMatchTrigger.onMerge(SessionMatchTrigger.java:56)
 at com.package.trigger.SessionMatchTrigger.onMerge(SessionMatchTrigger.java:14)
 at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.onMerge(WindowOperator.java:939)
 at 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator$1.merge(EvictingWindowOperator.java:141)
 at 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator$1.merge(EvictingWindowOperator.java:120)
 at 
org.apache.flink.streaming.runtime.operators.windowing.MergingWindowSet.addWindow(MergingWindowSet.java:212)
 at 
org.apache.flink.streaming.runtime.operators.windowing.EvictingWindowOperator.processElement(EvictingWindowOperator.java:119)
 at 
org.apache.flink.streaming.runtime.io.StreamInputProcessor.processInput(StreamInputProcessor.java:207)
 at 
org.apache.flink.streaming.runtime.tasks.OneInputStreamTask.run(OneInputStreamTask.java:69)
 at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:264)
 at org.apache.flink.runtime.taskmanager.Task.run(Task.java:718)
 at java.lang.Thread.run(Thread.java:745)
 Caused by: java.lang.Exception: Error while merging state in RocksDB
 at 
org.apache.flink.contrib.streaming.state.RocksDBReducingState.mergeNamespaces(RocksDBReducingState.java:186)
 at 
org.apache.flink.streaming.runtime.operators.windowing.WindowOperator$Context.mergePartitionedState(WindowOperator.java:887)
 ... 12 more
 Caused by: java.lang.IllegalArgumentException: Illegal value provided for 
SubCode.
 at org.rocksdb.Status$SubCode.getSubCode(Status.java:109)
 at org.rocksdb.Status.<init>(Status.java:30)
 at org.rocksdb.RocksDB.delete(Native Method)
 at org.rocksdb.RocksDB.delete(RocksDB.java:1110)
 at 
org.apache.flink.contrib.streaming.state.RocksDBReducingState.mergeNamespaces(RocksDBReducingState.java:143)
 ... 13 more
{panel}
 

I found the reason of this error. 
 Due to Java's

{RocksDB.Status.SubCode}

was out of sync with

{include/rocksdb/status.h:SubCode}

.
 When running out of disc space this led to an

{IllegalArgumentException}

because of an invalid status code, rather than just returning the corresponding 
status code without an exception.
 more details:<[https://github.com/facebook/rocksdb/pull/3050]>



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to