[ https://issues.apache.org/jira/browse/FLINK-4101?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15346204#comment-15346204 ]
Vasia Kalavri commented on FLINK-4101: -------------------------------------- Hi [~mrakki3110], JIRA is used for reporting bugs or proposing new features. Your question should be posted either in the user mailing list or SO, as you've already done. I'm closing this. > Calculating average in Flink DataStream on window time > ------------------------------------------------------ > > Key: FLINK-4101 > URL: https://issues.apache.org/jira/browse/FLINK-4101 > Project: Flink > Issue Type: Task > Components: DataStream API > Affects Versions: 1.0.2 > Reporter: Akshay Shingote > > I am using Flink DataStream API where there where racks are available & I > want to calculate "average"of temperature group by rack IDs. My window > duration is of 40 seconds & my window is sliding every 10 seconds...Following > is my code where I am calculating sum of temperatures every 10 seconds for > every rackID,but now I want to calculate average temperatures:: > static Properties properties=new Properties(); > public static Properties getProperties() > { > properties.setProperty("bootstrap.servers", "54.164.200.104:9092"); > properties.setProperty("zookeeper.connect", "54.164.200.104:2181"); > //properties.setProperty("deserializer.class", > "kafka.serializer.StringEncoder"); > //properties.setProperty("group.id", "akshay"); > properties.setProperty("auto.offset.reset", "earliest"); > return properties; > } > @SuppressWarnings("rawtypes") > public static void main(String[] args) throws Exception > { > StreamExecutionEnvironment > env=StreamExecutionEnvironment.getExecutionEnvironment(); > env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); > Properties props=Program.getProperties(); > DataStream<TemperatureEvent> dstream=env.addSource(new > FlinkKafkaConsumer09<TemperatureEvent>("TemperatureEvent", new > TemperatureEventSchema(), props)).assignTimestampsAndWatermarks(new > IngestionTimeExtractor<>()); > DataStream<TemperatureEvent> > ds1=dstream.keyBy("rackId").timeWindow(Time.seconds(40), > Time.seconds(10)).sum("temperature"); > env.execute("Temperature Consumer"); > } > How can I calcluate average temperature for the above example ?? -- This message was sent by Atlassian JIRA (v6.3.4#6332)