[ 
https://issues.apache.org/jira/browse/FLINK-20201?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Flink Jira Bot updated FLINK-20201:
-----------------------------------
      Labels: auto-deprioritized-major auto-deprioritized-minor  (was: 
auto-deprioritized-major stale-minor)
    Priority: Not a Priority  (was: Minor)

This issue was labeled "stale-minor" 7 days ago and has not received any 
updates so it is being deprioritized. If this ticket is actually Minor, please 
raise the priority and ask a committer to assign you the issue or revive the 
public discussion.


> Support automatic adjustment of window parameters
> -------------------------------------------------
>
>                 Key: FLINK-20201
>                 URL: https://issues.apache.org/jira/browse/FLINK-20201
>             Project: Flink
>          Issue Type: Improvement
>          Components: API / DataStream
>    Affects Versions: 1.11.2
>            Reporter: lqjacklee
>            Priority: Not a Priority
>              Labels: auto-deprioritized-major, auto-deprioritized-minor
>
> {code:java}
> static Map<String, Field> fieldMap;
>     static  {
>         fieldMap = Stream.of(Entity.class.getDeclaredFields())
>                 .collect(Collectors.toMap(Field::getName, field -> field));
>     }
>     public static class Entity extends PojoTypeInfo<Entity> implements 
> Comparable<Entity> {
>         public String name;
>         public long currentDate;
>         public int purchaseVolume;
>         public Entity(String name, int purchaseVolume) {
>             super(Entity.class, Arrays.asList(new 
> PojoField(fieldMap.get("name"), BasicTypeInfo.STRING_TYPE_INFO),
>                     new PojoField(fieldMap.get("purchaseVolume"), 
> BasicTypeInfo.INT_TYPE_INFO),
>                     new PojoField(fieldMap.get("currentDate"), 
> BasicTypeInfo.DATE_TYPE_INFO)
>                     ));
>             this.name = name;
>             this.purchaseVolume = purchaseVolume;
>             this.currentDate = System.currentTimeMillis();
>         }
>         @Override
>         public int compareTo(Entity o) {
>             return Long.compare(currentDate, o.currentDate);
>         }
>         public String getName() {
>             return name;
>         }
>         public void setName(String name) {
>             this.name = name;
>         }
>         public long getCurrentDate() {
>             return currentDate;
>         }
>         public void setCurrentDate(long currentDate) {
>             this.currentDate = currentDate;
>         }
>         public int getPurchaseVolume() {
>             return purchaseVolume;
>         }
>         public void setPurchaseVolume(int purchaseVolume) {
>             this.purchaseVolume = purchaseVolume;
>         }
>     }
>     public static void main(String[] args) throws Exception {
>         StreamExecutionEnvironment env = 
> StreamExecutionEnvironment.getExecutionEnvironment();
>         env
>             .fromElements(new Entity("jack", 10), new Entity("tom", 20), new 
> Entity("jack", 30))
>             .keyBy((KeySelector<Entity, String>) value -> value.name, 
> BasicTypeInfo.STRING_TYPE_INFO)
>             .timeWindow(Time.milliseconds(1))
>             .process(new ProcessWindowFunction<Entity, Integer, String, 
> TimeWindow>() {
>                 @Override
>                 public void process(String s, Context context, 
> Iterable<Entity> elements, Collector<Integer> out) throws Exception {
>                          StreamSupport.stream(elements.spliterator(), false)
>                             .map(Entity::getPurchaseVolume)
>                             .iterator()
>                             .forEachRemaining(out::collect);
>                 }
>             })
>             .print();
>         env.execute("window aggregate");
>     }
> {code}
> I provide the sample code to show how to use the window parameter in the 
> Flink. we want to provide the function to adjust the size of the window 
> parameter. 
> Because the speed of data generation changes greatly, we hope to dynamically 
> adjust the size of the window according to different data volume.
> The window parameter should be generated by machine learning. in that case, 
> we can just provide the window time range to user to configure. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)

Reply via email to