这个问题en...出在如下地方: KeyedStream<ShareRealTimeData, String> keyByStream = signoutTimeAndWM.keyBy(new KeySelector<ShareRealTimeData, String>() { @Override public String getKey(ShareRealTimeData value) throws Exception { return DateUtilMinutes.timeStampToDate(new Date().getTime()); // 此处,不可以使用new Date这种当前时间。 } });
修改,如果非要实现这种效果,可以先通过flatMap方式,针对每个元素 new Date 然后将这个date设置到ShareRealTimeData类中的一个字段(比如叫做key)。 然后再 keyBy(e->e.getKey()) 基于key这个字段做keyBy,效果一样,但不会出你这个问题。 原理比较复杂,和Flink的key分发机制有关,你这种写法会导致一个元素的key不稳定,因为实际就是<随机>的key。 lp <973182...@qq.com> 于2021年1月5日周二 下午8:11写道: > operator操作:processWindowFunction的代码如下: > > class MyProcessWindowFuncation extends > ProcessWindowFunction<ShareRealTimeData, TreeMap<Double, > Tuple2<String, String>>, String, TimeWindow>{ > private transient MapState<String, Tuple2<String, Double>> > eveShareNoMaxPrice; > private transient ValueState<TreeMap<Double, Tuple2<String, > String>>> shareAndMaxPrice; > > > @Override > public void process(String s, Context context, > Iterable<ShareRealTimeData> elements, Collector<TreeMap<Double, > Tuple2<String, String>>> out) throws Exception { > Iterator<ShareRealTimeData> iterator = elements.iterator(); > > //得到每trigger周期内每个shareNo的最大值 > while (iterator.hasNext()) { > ShareRealTimeData next = iterator.next(); > Tuple2<String, Double> t2 = > eveShareNoMaxPrice.get(next.getShareNo()); > if (t2 == null || t2.f1 < next.getCurrentPrice()) { > eveShareNoMaxPrice.put(next.getShareNo(), > Tuple2.of(next.getShareName(), next.getCurrentPrice())); > } > } > > > TreeMap<Double, Tuple2<String, String>> shareAndMaxPriceV = > shareAndMaxPrice.value(); > if (shareAndMaxPriceV == null) { > shareAndMaxPriceV = new TreeMap(new Comparator<Double>() { > @Override > public int compare(Double o1, Double o2) { > return Double.compare(o2, o1); > } > }); > } > Iterator<Map.Entry<String, Tuple2<String, Double>>> > keysAndMaxPrice = eveShareNoMaxPrice.entries().iterator(); > while (keysAndMaxPrice.hasNext()) { > Map.Entry<String, Tuple2<String, Double>> next = > keysAndMaxPrice.next(); > > shareAndMaxPriceV.put(next.getValue().f1, > Tuple2.of(next.getKey(), next.getValue().f0)); > if (shareAndMaxPriceV.size() > 20) { > shareAndMaxPriceV.pollLastEntry(); > } > } > > eveShareNoMaxPrice.clear(); > shareAndMaxPrice.clear(); > > out.collect(shareAndMaxPriceV); > } > > @Override > public void open(Configuration parameters) throws Exception { > super.open(parameters); > eveShareNoMaxPrice = getRuntimeContext().getMapState(new > MapStateDescriptor<String, Tuple2<String, Double>>("eveShareNoMaxPrice", > TypeInformation.of(new TypeHint<String>() { > }), TypeInformation.of(new TypeHint<Tuple2<String, > Double>>() > { > }))); > shareAndMaxPrice = getRuntimeContext().getState(new > ValueStateDescriptor<TreeMap<Double, Tuple2<String, > String>>>("shareAndMaxPrice", TypeInformation.of(new > TypeHint<TreeMap<Double, Tuple2<String, String>>>() { > }))); > } > } > > > > -- > Sent from: http://apache-flink.147419.n8.nabble.com/ >