streamOperator .assignTimestampsAndWatermarks(new AscendingTimestampExtractor<EventItem>() { @Override public long extractAscendingTimestamp(EventItem eventItem) { return eventItem.getWindowEnd(); } }) .map(eventItem -> Tuple2.of(eventItem.getItemId(), 1L)) .keyBy(1) .timeWindow(Time.minutes(5)) .aggregate(new AccumulatorAggregateFunction<>(), (WindowFunction<Long, EventItem, Tuple, TimeWindow>) (key, timeWindow, iterable, collector) -> { String newId = ((Tuple1<String>) key).f0; Long count = iterable.iterator().next(); collector.collect(EventItem.of(newId, timeWindow.getEnd(), count)); }) .keyBy(1) .process(new KeyedProcessFunction<Tuple, EventItem, Tuple2<String, Long>>() { private MapState<String, Long> itemState; private ValueState<Long> dayState;
@Override public void open(Configuration parameters) throws Exception { MapStateDescriptor<String, Long> mapStateDescriptor = new MapStateDescriptor<>("ei_pv", TypeInformation.of(String.class), TypeInformation.of(Long.class)); itemState = getRuntimeContext().getMapState(mapStateDescriptor); dayState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("day_state", TypeInformation.of(Long.class))); dayState.update((long) currentDay(System.currentTimeMillis())); } private int currentDay(long epochDay) { return LocalDate.ofEpochDay(epochDay).getDayOfYear(); } @Override public void processElement(EventItem input, Context context, Collector<Tuple2<String, Long>> collector) throws Exception { String ei = input.getItemId(); Long cnt = itemState.get(ei); long viewCount = input.getViewCount(); cnt = cnt != null ? cnt + viewCount : viewCount; itemState.put(ei, cnt); context.timerService().registerEventTimeTimer(input.getWindowEnd() + 5000); } @Override public void onTimer(long time, OnTimerContext ctx, Collector<Tuple2<String, Long>> out) throws Exception { int currentDay = currentDay(time); boolean isCurrentDay = currentDay == dayState.value(); if (!isCurrentDay) { itemState.clear(); dayState.update((long) currentDay); } for (Map.Entry<String, Long> entry : itemState.entries()) { out.collect(Tuple2.of(entry.getKey(), entry.getValue())); } } }) .addSink(textLongSink); ???????????????? ------------------ Original ------------------ From: "?? ??"<thinktothi...@yahoo.com.INVALID>; Date: Tue, Mar 5, 2019 01:32 PM To: "user-zh"<user-zh@flink.apache.org>; Subject: Re: ???????????????????????????????????????? ???????????????????????????? ).Flink Stream???????????????????????????????????????????????????????? ).?????????? ProcessAllWindowFunction,????????????????Window??????????????????????????????????Window??????operator?????????????? ).??????????????????????????????????????????????????????????????(??Redis????) ).??????????ProcessAllWIndowFunction??????????????(????????: WordCount ????(??????????????????) ) package com.opensourceteams.module.bigdata.flink.example.stream.worldcount.nc.sort import java.time.ZoneId import org.apache.flink.configuration.Configuration import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment import org.apache.flink.streaming.api.scala.function.ProcessAllWindowFunction import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows import org.apache.flink.streaming.api.windowing.time.Time import org.apache.flink.streaming.api.windowing.windows.TimeWindow import org.apache.flink.streaming.connectors.fs.bucketing.{BucketingSink, DateTimeBucketer} import org.apache.flink.util.Collector import scala.collection.mutable /** * nc -lk 1234 ???????? */ object SocketWindowWordCountLocalSinkHDFSAndWindowAllAndSorted { def getConfiguration(isDebug:Boolean = false):Configuration={ val configuration : Configuration = new Configuration() if(isDebug){ val timeout = "100000 s" val timeoutHeartbeatPause = "1000000 s" configuration.setString("akka.ask.timeout",timeout) configuration.setString("akka.lookup.timeout",timeout) configuration.setString("akka.tcp.timeout",timeout) configuration.setString("akka.transport.heartbeat.interval",timeout) configuration.setString("akka.transport.heartbeat.pause",timeoutHeartbeatPause) configuration.setString("akka.watch.heartbeat.pause",timeout) configuration.setInteger("heartbeat.interval",10000000) configuration.setInteger("heartbeat.timeout",50000000) } configuration } def main(args: Array[String]): Unit = { val port = 1234 // get the execution environment // val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val configuration : Configuration = getConfiguration(true) val env:StreamExecutionEnvironment = StreamExecutionEnvironment.createLocalEnvironment(1,configuration) // get input data by connecting to the socket val dataStream = env.socketTextStream("localhost", port, '\n') import org.apache.flink.streaming.api.scala._ val dataStreamDeal = dataStream.flatMap( w => w.split("\\s") ).map( w => WordWithCount(w,1)) .keyBy("word") //??????window????????????????????????ProcessAllWindowFunction????????????(????????????????????key????????) //??????window???????????????????????????? .windowAll(TumblingProcessingTimeWindows.of(Time.seconds(5))) .process(new ProcessAllWindowFunction[WordWithCount,WordWithCount,TimeWindow] { override def process(context: Context, elements: Iterable[WordWithCount], out: Collector[WordWithCount]): Unit = { val set = new mutable.HashSet[WordWithCount]{} for(wordCount <- elements){ if(set.contains(wordCount)){ set.remove(wordCount) set.add(new WordWithCount(wordCount.word,wordCount.count + 1)) }else{ set.add(wordCount) } } val sortSet = set.toList.sortWith( (a,b) => a.word.compareTo(b.word) < 0 ) for(wordCount <- sortSet) out.collect(wordCount) } }) //.countWindow(3) //.countWindow(3,1) //.countWindowAll(3) //textResult.print().setParallelism(1) val bucketingSink = new BucketingSink[WordWithCount]("file:/opt/n_001_workspaces/bigdata/flink/flink-maven-scala-2/sink-data") bucketingSink.setBucketer(new DateTimeBucketer[WordWithCount]("yyyy-MM-dd--HHmm", ZoneId.of("Asia/Shanghai"))) //bucketingSink.setWriter(new SequenceFileWriter[IntWritable, Text]()) //bucketingSink.setWriter(new SequenceFileWriter[WordWithCount]()) //bucketingSink.setBatchSize(1024 * 1024 * 400) // this is 400 MB, //bucketingSink.setBatchSize(100 ) // this is 400 MB, bucketingSink.setBatchSize(1024 * 1024 * 400 ) // this is 400 MB, //bucketingSink.setBatchRolloverInterval(20 * 60 * 1000); // this is 20 mins bucketingSink.setBatchRolloverInterval( 2 * 1000); // this is 20 mins //setInactiveBucketCheckInterval //setInactiveBucketThreshold //??????????????????Sink?????????????????????????????????????? bucketingSink.setInactiveBucketThreshold(2 * 1000) bucketingSink.setAsyncTimeout(1 * 1000) dataStreamDeal.setParallelism(1) .addSink(bucketingSink) if(args == null || args.size ==0){ env.execute("????????") //???????? //println(env.getExecutionPlan) //StreamGraph //println(env.getStreamGraph.getStreamingPlanAsJSON) //JsonPlanGenerator.generatePlan(jobGraph) }else{ env.execute(args(0)) } println("????") } // Data type for words with count case class WordWithCount(word: String, count: Long) /* abstract private class OrderWindowFunction extends ProcessWindowFunction<WordWithCount,WordWithCount,WordWithCount,TimeWindow> { }*/ } --------------------------------------------------------------------------------------------------------------------------------------- > ?? 2019??3??5????????1:16???????? <m...@zhangzuofeng.cn> ?????? > > ???????? > ????????????????????????stream api?????????????????????????????????????? > ????! ---------------------------------------------------------------------------------------------------------------------------------------