Akshay Shingote created FLINK-4101: -------------------------------------- Summary: Calculating average in Flink DataStream on window time Key: FLINK-4101 URL: https://issues.apache.org/jira/browse/FLINK-4101 Project: Flink Issue Type: Task Components: DataStream API Affects Versions: 1.0.2 Reporter: Akshay Shingote
I am using Flink DataStream API where there where racks are available & I want to calculate "average"of temperature group by rack IDs. My window duration is of 40 seconds & my window is sliding every 10 seconds...Following is my code where I am calculating sum of temperatures every 10 seconds for every rackID,but now I want to calculate average temperatures:: static Properties properties=new Properties(); public static Properties getProperties() { properties.setProperty("bootstrap.servers", "54.164.200.104:9092"); properties.setProperty("zookeeper.connect", "54.164.200.104:2181"); //properties.setProperty("deserializer.class", "kafka.serializer.StringEncoder"); //properties.setProperty("group.id", "akshay"); properties.setProperty("auto.offset.reset", "earliest"); return properties; } @SuppressWarnings("rawtypes") public static void main(String[] args) throws Exception { StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment(); env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); Properties props=Program.getProperties(); DataStream<TemperatureEvent> dstream=env.addSource(new FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent", new TemperatureEventSchema(), props)).assignTimestampsAndWatermarks(new IngestionTimeExtractor<>()); DataStream<TemperatureEvent> ds1=dstream.keyBy("rackId").timeWindow(Time.seconds(40), Time.seconds(10)).sum("temperature"); env.execute("Temperature Consumer"); } How can I calcluate average temperature for the above example ?? -- This message was sent by Atlassian JIRA (v6.3.4#6332)