[ https://issues.apache.org/jira/browse/FLINK-20201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Flink Jira Bot updated FLINK-20201: ----------------------------------- Labels: auto-deprioritized-major auto-deprioritized-minor (was: auto-deprioritized-major stale-minor) Priority: Not a Priority (was: Minor) This issue was labeled "stale-minor" 7 days ago and has not received any updates so it is being deprioritized. If this ticket is actually Minor, please raise the priority and ask a committer to assign you the issue or revive the public discussion. > Support automatic adjustment of window parameters > ------------------------------------------------- > > Key: FLINK-20201 > URL: https://issues.apache.org/jira/browse/FLINK-20201 > Project: Flink > Issue Type: Improvement > Components: API / DataStream > Affects Versions: 1.11.2 > Reporter: lqjacklee > Priority: Not a Priority > Labels: auto-deprioritized-major, auto-deprioritized-minor > > {code:java} > static Map<String, Field> fieldMap; > static { > fieldMap = Stream.of(Entity.class.getDeclaredFields()) > .collect(Collectors.toMap(Field::getName, field -> field)); > } > public static class Entity extends PojoTypeInfo<Entity> implements > Comparable<Entity> { > public String name; > public long currentDate; > public int purchaseVolume; > public Entity(String name, int purchaseVolume) { > super(Entity.class, Arrays.asList(new > PojoField(fieldMap.get("name"), BasicTypeInfo.STRING_TYPE_INFO), > new PojoField(fieldMap.get("purchaseVolume"), > BasicTypeInfo.INT_TYPE_INFO), > new PojoField(fieldMap.get("currentDate"), > BasicTypeInfo.DATE_TYPE_INFO) > )); > this.name = name; > this.purchaseVolume = purchaseVolume; > this.currentDate = System.currentTimeMillis(); > } > @Override > public int compareTo(Entity o) { > return Long.compare(currentDate, o.currentDate); > } > public String getName() { > return name; > } > public void setName(String name) { > this.name = name; > } > public long getCurrentDate() { > return currentDate; > } > public void setCurrentDate(long currentDate) { > this.currentDate = currentDate; > } > public int getPurchaseVolume() { > return purchaseVolume; > } > public void setPurchaseVolume(int purchaseVolume) { > this.purchaseVolume = purchaseVolume; > } > } > public static void main(String[] args) throws Exception { > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > env > .fromElements(new Entity("jack", 10), new Entity("tom", 20), new > Entity("jack", 30)) > .keyBy((KeySelector<Entity, String>) value -> value.name, > BasicTypeInfo.STRING_TYPE_INFO) > .timeWindow(Time.milliseconds(1)) > .process(new ProcessWindowFunction<Entity, Integer, String, > TimeWindow>() { > @Override > public void process(String s, Context context, > Iterable<Entity> elements, Collector<Integer> out) throws Exception { > StreamSupport.stream(elements.spliterator(), false) > .map(Entity::getPurchaseVolume) > .iterator() > .forEachRemaining(out::collect); > } > }) > .print(); > env.execute("window aggregate"); > } > {code} > I provide the sample code to show how to use the window parameter in the > Flink. we want to provide the function to adjust the size of the window > parameter. > Because the speed of data generation changes greatly, we hope to dynamically > adjust the size of the window according to different data volume. > The window parameter should be generated by machine learning. in that case, > we can just provide the window time range to user to configure. -- This message was sent by Atlassian Jira (v8.20.1#820001)