Github user kaibozhou commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4355#discussion_r135163096
  
    --- Diff: 
flink-libraries/flink-table/src/test/java/org/apache/flink/table/runtime/utils/JavaUserDefinedAggFunctions.java
 ---
    @@ -135,4 +138,177 @@ public void retract(WeightedAvgAccum accumulator, int 
iValue, int iWeight) {
                        accumulator.count -= iWeight;
                }
        }
    +
    +   /**
    +    * CountDistinct accumulator.
    +    */
    +   public static class CountDistinctAccum {
    +           public MapView<String, Integer> map;
    +           public long count;
    +   }
    +
    +   /**
    +    * CountDistinct aggregate.
    +    */
    +   public static class CountDistinct extends AggregateFunction<Long, 
CountDistinctAccum> {
    +
    +           @Override
    +           public CountDistinctAccum createAccumulator() {
    +                   CountDistinctAccum accum = new CountDistinctAccum();
    +                   accum.map = new MapView<>(Types.STRING, Types.INT);
    +                   accum.count = 0L;
    +                   return accum;
    +           }
    +
    +           //Overloaded accumulate method
    +           public void accumulate(CountDistinctAccum accumulator, String 
id) {
    +                   try {
    +                           if (!accumulator.map.contains(id)) {
    +                                   accumulator.map.put(id, 1);
    +                                   accumulator.count += 1;
    +                           }
    +                   } catch (Exception e) {
    +                           e.printStackTrace();
    +                   }
    +           }
    +
    +           //Overloaded accumulate method
    +           public void accumulate(CountDistinctAccum accumulator, long id) 
{
    +                   try {
    +                           if 
(!accumulator.map.contains(String.valueOf(id))) {
    +                                   accumulator.map.put(String.valueOf(id), 
1);
    +                                   accumulator.count += 1;
    +                           }
    +                   } catch (Exception e) {
    +                           e.printStackTrace();
    +                   }
    +           }
    +
    +           @Override
    +           public Long getValue(CountDistinctAccum accumulator) {
    +                   return accumulator.count;
    +           }
    +   }
    +
    +   /**
    +    * CountDistinct aggregate with merge.
    +    */
    +   public static class CountDistinctWithMerge extends CountDistinct {
    +
    +           //Overloaded merge method
    +           public void merge(CountDistinctAccum acc, 
Iterable<CountDistinctAccum> it) {
    +                   Iterator<CountDistinctAccum> iter = it.iterator();
    +                   while (iter.hasNext()) {
    +                           CountDistinctAccum mergeAcc = iter.next();
    +                           acc.count += mergeAcc.count;
    +
    +                           try {
    +                                   Iterator<String> mapItr = 
mergeAcc.map.keys().iterator();
    +                                   while (mapItr.hasNext()) {
    +                                           String key = mapItr.next();
    +                                           if (!acc.map.contains(key)) {
    +                                                   acc.map.put(key, 1);
    +                                           }
    +                                   }
    +                           } catch (Exception e) {
    +                                   e.printStackTrace();
    +                           }
    +                   }
    +           }
    +   }
    +
    +   /**
    +    * CountDistinct aggregate with merge and reset.
    +    */
    +   public static class CountDistinctWithMergeAndReset extends 
CountDistinctWithMerge {
    +
    +           //Overloaded retract method
    +           public void resetAccumulator(CountDistinctAccum acc) {
    +                   acc.map.clear();
    +                   acc.count = 0;
    +           }
    +   }
    +
    +   /**
    +    * CountDistinct aggregate with retract.
    +    */
    +   public static class CountDistinctWithRetractAndReset extends 
CountDistinct {
    +
    +           //Overloaded retract method
    +           public void retract(CountDistinctAccum accumulator, long id) {
    +                   try {
    +                           if 
(!accumulator.map.contains(String.valueOf(id))) {
    +                                   
accumulator.map.remove(String.valueOf(id));
    +                                   accumulator.count -= 1;
    +                           }
    +                   } catch (Exception e) {
    +                           e.printStackTrace();
    +                   }
    +           }
    +
    +           //Overloaded retract method
    +           public void resetAccumulator(CountDistinctAccum acc) {
    +                   acc.map.clear();
    +                   acc.count = 0;
    +           }
    +   }
    +
    +   /**
    +    * Accumulator for test DataView.
    +    */
    +   public static class DataViewTestAccum {
    +           public MapView<String, Integer> map;
    +           public MapView<String, Integer> map2;
    +           public long count;
    +           private ListView<Long> list = new ListView<>(Types.LONG);
    +
    +           public ListView<Long> getList() {
    +                   return list;
    +           }
    +
    +           public void setList(ListView<Long> list) {
    +                   this.list = list;
    +           }
    +   }
    +
    +   /**
    +    * Aggregate for test DataView.
    +    */
    +   public static class DataViewTestAgg extends AggregateFunction<Long, 
DataViewTestAccum> {
    +
    +           @Override
    +           public DataViewTestAccum createAccumulator() {
    +                   DataViewTestAccum accum = new DataViewTestAccum();
    +                   accum.map = new MapView<>(Types.STRING, Types.INT);
    --- End diff --
    
    yes, it is used to test the case of non-initialization,i will add some 
comments to it.


---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at infrastruct...@apache.org or file a JIRA ticket
with INFRA.
---

Reply via email to