fanrui created FLINK-18503:
------------------------------

             Summary: bug occurs when `HeapReducingState#add` method handles 
null
                 Key: FLINK-18503
                 URL: https://issues.apache.org/jira/browse/FLINK-18503
             Project: Flink
          Issue Type: Bug
          Components: Runtime / State Backends
    Affects Versions: 1.12.0
            Reporter: fanrui
             Fix For: 1.12.0
         Attachments: image-2020-07-07-02-20-03-420.png, 
image-2020-07-07-02-20-57-299.png

In our production environment, there are advertising billing jobs, which are 
keyBy according to advertiserId. Calculate the cost of each advertiser in the 
specified window, that is, use ReducingFunction to sum the price after keyBy by 
advertiser. 

But it is found that the results calculated using FsStateBackend and 
RocksDBStateBackend are different. The calculation result of FsStateBackend is 
wrong, and the calculation result of RocksDBStateBackend is correct.

After reading the source code, HeapReducingState#add code:[code 
link|https://github.com/apache/flink/blob/f730e16fb47b0fcace7d3a1a8c8e3cb2c837ceec/flink-runtime/src/main/java/org/apache/flink/runtime/state/heap/HeapReducingState.java#L93]

 
{code:java}
public void add(V value) throws IOException {

   if (value == null) {
      clear();
      return;
   }

   try {
      stateTable.transform(currentNamespace, value, reduceTransformation);
   } catch (Exception e) {
      throw new IOException("Exception while applying ReduceFunction in 
reducing state", e);
   }
}
{code}
If value==null, the clear method deletes the data of the current 
<key,namespace> from the StateTable. ReducingFunction will only be executed if 
value!=null.
h2. Why is there a bug?

For a job that calculates cost, if price != null, the price is added to result; 
if price == null, then result is unchanged. 

The ResultFunction method handles the case of price == null, our ResultFunction 
is as follows:

 
{code:java}
ReduceFunction<Long> sumFunction = new ReduceFunction<Long>() {
   @Override
   public Long reduce(Long previousState, Long newValue) throws Exception {
      // if newValue ==null,
      // consider newValue to be 0 and return previousState directly
      if (newValue == null) {
         return previousState;
      }
      return previousState + newValue;
   }
};
{code}
 

However, when HeapReducingState#add finds that the input value == null, it 
directly executes the clear method, and does not execute the user-defined 
ResultFunction at all.

For example: if the input prices are 17, null, and 11, the price saved in the 
state is 17 when you enter 17, the price is cleared when you enter null, and 
the price is 11 when you enter 11, so the result is wrong.

Fortunately, the calculation result of RocksDBStateBackend is correct. The 
RocksDBReducingState#add method does not perform special treatment for null. 
RocksDBReducingState#add code is as follows:[code 
link|https://github.com/apache/flink/blob/f730e16fb47b0fcace7d3a1a8c8e3cb2c837ceec/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBReducingState.java#L92]

 
{code:java}
public void add(V value) throws Exception {
   byte[] key = getKeyBytes();
   V oldValue = getInternal(key);
   V newValue = oldValue == null ? value : reduceFunction.reduce(oldValue, 
value);
   updateInternal(key, newValue);
}
{code}
h2. Flink UT can reproduce this bug

StateBackendTestBase#testReducingStateAddAndGet can reproduce this bug.

Need to be modified as follows:
 # udf
{code:java}
ReduceFunction<Long> sumFunction = new ReduceFunction<Long>() {
   @Override
   public Long reduce(Long previousState, Long newValue) throws Exception {
      // if newValue ==null,
      // consider newValue to be 0 and return previousState directly
      if (newValue == null) {
         return previousState;
      }
      return previousState + newValue;
   }
};

final ReducingStateDescriptor<Long> stateDescr =
   new ReducingStateDescriptor<>("my-state", sumFunction, Long.class);{code}

 # add element

{code:java}
keyedBackend.setCurrentKey("def");
assertNull(state.get());
state.add(17L);
state.add(null);//new code
state.add(11L);
assertEquals(28L, state.get().longValue());{code}

My code repository commit 
[link|https://github.com/1996fanrui/flink/commit/645118dd2f95de88580d07e00d88e8783a0f9680]

 The UT execution output of RocksDBStateBackendTest is as follows:

!image-2020-07-07-02-20-03-420.png!

 

 The UT execution output of FileStateBackendTest&MemoryStateBackendTest is as 
follows:

!image-2020-07-07-02-20-57-299.png!
{code:java}
java.lang.AssertionError: 
Expected :28
Actual   :11{code}
The above phenomenon shows that the HeapReducingState#add method has a bug. 
Regardless of which state backend you choose, the semantics provided by the 
Flink engine should be consistent and should not output different calculation 
results.
h2. My solution

Remove the processing logic of value == null in HeapReducingState#add. Result: 
All UTs of FileStateBackendTest can be passed.
h2. Similar bug

HeapFoldingState#add & HeapAggregatingState#add
h2. Question

HeapReducingState#add When designing, why does the designer handle the null 
case specially? I think the case of null should be handled by the user-defined 
ReducingFunction.

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to