Hi Kurt,

Thanks! Your link helps me a lot.

I still have some problems after I glance on the document. As you can see from 
my first email, I tried to implement a mapfunction class in flink. I actually 
have 3 arraylists to be maintain at this map operator. I think the  Using 
managed keyed state part of the document fits my requirement. But I am still 
confused about how to maintain my ListStates

1. Does each Array List has its own ListState? 
2. I am not clear with the open function on the example given by Flink. I 
wonder how I can initialize my arraylists with ListStateDescriptor. 



Desheng Zhang
E-mail: gzzhangdesh...@corp.netease.com;

> On Jul 13, 2017, at 13:23, Kurt Young <ykt...@gmail.com> wrote:
> 
> Hi,
> 
> I think you can use State to achieve your goal: 
> https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html
>  
> <https://ci.apache.org/projects/flink/flink-docs-release-1.3/dev/stream/state.html>
> 
> Best,
> Kurt
> 
> On Thu, Jul 13, 2017 at 1:14 PM, ZalaCheung <gzzhangdesh...@corp.netease.com 
> <mailto:gzzhangdesh...@corp.netease.com>> wrote:
> Hi all,
> 
> I am stuck with a problem. I have a stream, I want keyby it and then do a map 
> function on it.  But for each map operator, I want to maintain a variable for 
> it. Is that possible? I tried a naive version on local IntelliJ IDE and it 
> works. But I got nullpointerException while trying to run it on a cluster.
> 
> Here is the pseudo code for my naive version, wish it will help you guys 
> understand my question.
> 
> public class anomalydetection{
>    private static List<TimeSeries> queue;
>    public static void main(String[] args)throws Exception{
>         initialize();
>         getStreamExecutionEnvironment();
>         DataStream input = ...
>         stream.keyby("some key").map(
>             MapFunction(){
>                 if(queue.size() < some_num){
>                     queue.add()
>                     //do something
>                 }
>                 else{
>                     //dosomething
>                 }
>             }
>         )
>     
>     public void initialize(){
>         queue = new ArrayList<>();
>     }
> }
> 
> when I try to get the size of the arraylist, I get a NullPointerException. 
> Beside that, I want  to maintain a list for each map operator after I group 
> the stream by some key.
> 
> 
> Is that possible to do what I want in Flink? 
> 
> Desheng Zhang
> 
> E-mail: gzzhangdesh...@corp.netease.com 
> <mailto:gzzhangdesh...@corp.netease.com>;
> 
> 

Reply via email to