Bo Cui created FLINK-32017:
------------------------------

             Summary: DISTINCT COUNT result is incorrect with ttl
                 Key: FLINK-32017
                 URL: https://issues.apache.org/jira/browse/FLINK-32017
             Project: Flink
          Issue Type: Bug
    Affects Versions: 1.17.0, 1.15.0, 1.12.0, 1.18.0
            Reporter: Bo Cui


SQL: SELECT COUNT(DISTINCT `c`) FROM Table1
and set ttl to 10s

and Flink will generate code:
{code:java}
public final class GroupAggsHandler$15 implements 
org.apache.flink.table.runtime.generated.AggsHandleFunction {

          long agg0_count;
          boolean agg0_countIsNull;
          private transient 
org.apache.flink.table.runtime.typeutils.ExternalSerializer 
externalSerializer$0;
          private transient 
org.apache.flink.table.runtime.typeutils.ExternalSerializer 
externalSerializer$1;
          private org.apache.flink.table.runtime.dataview.StateMapView 
distinctAcc_0_dataview;
          private org.apache.flink.table.data.binary.BinaryRawValueData 
distinctAcc_0_dataview_raw_value;
          private org.apache.flink.table.api.dataview.MapView distinct_view_0;
          org.apache.flink.table.data.GenericRowData acc$3 = new 
org.apache.flink.table.data.GenericRowData(2);
          org.apache.flink.table.data.GenericRowData acc$5 = new 
org.apache.flink.table.data.GenericRowData(2);
          org.apache.flink.table.data.GenericRowData aggValue$14 = new 
org.apache.flink.table.data.GenericRowData(1);

          private org.apache.flink.table.runtime.dataview.StateDataViewStore 
store;

          public GroupAggsHandler$15(java.lang.Object[] references) throws 
Exception {
            externalSerializer$0 = 
(((org.apache.flink.table.runtime.typeutils.ExternalSerializer) references[0]));
            externalSerializer$1 = 
(((org.apache.flink.table.runtime.typeutils.ExternalSerializer) references[1]));
          }

          private org.apache.flink.api.common.functions.RuntimeContext 
getRuntimeContext() {
            return store.getRuntimeContext();
          }

          @Override
          public void 
open(org.apache.flink.table.runtime.dataview.StateDataViewStore store) throws 
Exception {
            this.store = store;
            
            distinctAcc_0_dataview = 
(org.apache.flink.table.runtime.dataview.StateMapView) 
store.getStateMapView("distinctAcc_0", true, externalSerializer$0, 
externalSerializer$1);
            distinctAcc_0_dataview_raw_value = 
org.apache.flink.table.data.binary.BinaryRawValueData.fromObject(distinctAcc_0_dataview);
            distinct_view_0 = distinctAcc_0_dataview;
          }

          @Override
          public void accumulate(org.apache.flink.table.data.RowData accInput) 
throws Exception {
            
            int field$7;
            boolean isNull$7;
            boolean isNull$9;
            long result$10;
            isNull$7 = accInput.isNullAt(0);
            field$7 = -1;
            if (!isNull$7) {
              field$7 = accInput.getInt(0);
            }
            java.lang.Integer distinctKey$8 = (java.lang.Integer) field$7;
            if (isNull$7) {
              distinctKey$8 = null;
            }
                     
            java.lang.Long value$12 = (java.lang.Long) 
distinct_view_0.get(distinctKey$8);
            if (value$12 == null) {
              value$12 = 0L;
            }
                   
            boolean is_distinct_value_changed_0 = false;
            
            long existed$13 = ((long) value$12) & (1L << 0);
            if (existed$13 == 0) {  // not existed
              value$12 = ((long) value$12) | (1L << 0);
              is_distinct_value_changed_0 = true;
              
            long result$11 = -1L;
            boolean isNull$11;
            if (isNull$7) {
              
             // --- Cast section generated by 
org.apache.flink.table.planner.functions.casting.IdentityCastRule
             
             // --- End cast section
                           
              isNull$11 = agg0_countIsNull;
              if (!isNull$11) {
                result$11 = agg0_count;
              }
            }
            else {
              
            
            
            isNull$9 = agg0_countIsNull || false;
            result$10 = -1L;
            if (!isNull$9) {
              
            
            result$10 = (long) (agg0_count + ((long) 1L));
            
              
            }
            
             // --- Cast section generated by 
org.apache.flink.table.planner.functions.casting.IdentityCastRule
             
             // --- End cast section
                           
              isNull$11 = isNull$9;
              if (!isNull$11) {
                result$11 = result$10;
              }
            }
            agg0_count = result$11;;
            agg0_countIsNull = isNull$11;
                   
            }
                       
            if (is_distinct_value_changed_0) {
              distinct_view_0.put(distinctKey$8, value$12);
            }
                   
            
          }

          @Override
          public void setAccumulators(org.apache.flink.table.data.RowData acc) 
throws Exception {
            
            long field$6;
            boolean isNull$6;
            isNull$6 = acc.isNullAt(0);
            field$6 = -1L;
            if (!isNull$6) {
              field$6 = acc.getLong(0);
            }
            
            distinct_view_0 = distinctAcc_0_dataview;
            
            
            agg0_count = field$6;;
            agg0_countIsNull = isNull$6;
                     
            
                
          }
        }
{code}
and distinctAcc_0_dataview and GroupAggFunctionttl#accState are the 10s, 
GroupAggFunctionttl#accState save `Count`, distinctAcc_0_dataview save 
`distinct`.

Reproduction:
1) input 1 record, distinctAcc_0_dataview is updated to 1 and accState is 
updated to 1
2) after 5 s, input same record , distinctAcc_0_dataview remains unchanged and 
accState is updated to 1
3) after 11 s, distinctAcc_0_dataview expiration
4) after 12 s, input same record , distinctAcc_0_dataview is updated to 1 and 
accState is updated to 2



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to