Hi,

Is the partitioned functioned used by the ".keyBy(Object)" of the form:

Object.hash % getNumberOfParallelSubtasks()

?



Dr. Radu Tudoran
Research Engineer
IT R&D Division


HUAWEI TECHNOLOGIES Duesseldorf GmbH
European Research Center
Riesstrasse 25, 80992 München

E-mail: radu.tudo...@huawei.com
Mobile: +49 15209084330
Telephone: +49 891588344173

HUAWEI TECHNOLOGIES Duesseldorf GmbH
Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com
Registered Office: Düsseldorf, Register Court Düsseldorf, HRB 56063,
Managing Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN
Sitz der Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN
This e-mail and its attachments contain confidential information from HUAWEI, 
which is intended only for the person or entity whose address is listed above. 
Any use of the information contained herein in any way (including, but not 
limited to, total or partial disclosure, reproduction, or dissemination) by 
persons other than the intended recipient(s) is prohibited. If you receive this 
e-mail in error, please notify the sender by phone or email immediately and 
delete it!


-----Original Message-----
From: Aljoscha Krettek [mailto:aljos...@apache.org] 
Sent: Tuesday, December 08, 2015 5:00 PM
To: user@flink.apache.org
Subject: Re: Question about DataStream serialization

Hi,
it is not possible in an officially supported way. There is however a trick 
that you could use: You can cast the OperatorState to a KvState. This has a 
method setCurrentKey() that sets the key to be used when calling value() and 
update(). In this way you can trick the OperatorState into thinking that it has 
the key of an input element.

This is an internal API, however, and could change in the future, thereby 
breaking your program.

Cheers,
Aljoscha
> On 08 Dec 2015, at 16:31, Radu Tudoran <radu.tudo...@huawei.com> wrote:
> 
> Hi,
> 
> The state that is being loaded can very well be partitioned by keys. Assuming 
> this scenario and that you would now that the keys go from 0 to N, is there 
> some possibility to load and partitioned the initial data in the open 
> function?
> 
> 
> Dr. Radu Tudoran
> Research Engineer
> IT R&D Division
> 
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> European Research Center
> Riesstrasse 25, 80992 München
> 
> E-mail: radu.tudo...@huawei.com
> Mobile: +49 15209084330
> Telephone: +49 891588344173
> 
> HUAWEI TECHNOLOGIES Duesseldorf GmbH
> Hansaallee 205, 40549 Düsseldorf, Germany, www.huawei.com Registered 
> Office: Düsseldorf, Register Court Düsseldorf, HRB 56063, Managing 
> Director: Jingwen TAO, Wanzhou MENG, Lifang CHEN Sitz der 
> Gesellschaft: Düsseldorf, Amtsgericht Düsseldorf, HRB 56063,
> Geschäftsführer: Jingwen TAO, Wanzhou MENG, Lifang CHEN This e-mail 
> and its attachments contain confidential information from HUAWEI, which is 
> intended only for the person or entity whose address is listed above. Any use 
> of the information contained herein in any way (including, but not limited 
> to, total or partial disclosure, reproduction, or dissemination) by persons 
> other than the intended recipient(s) is prohibited. If you receive this 
> e-mail in error, please notify the sender by phone or email immediately and 
> delete it!
> 
> 
> -----Original Message-----
> From: Aljoscha Krettek [mailto:aljos...@apache.org]
> Sent: Tuesday, December 08, 2015 4:20 PM
> To: user@flink.apache.org
> Subject: Re: Question about DataStream serialization
> 
> Ah, I see what’s the problem. Operator state is scoped to the key of the 
> incoming element. In the open() method, no element has been received yet, so 
> the key of the incoming element is basically NULL. So the open() method 
> initializes state for key NULL. In flatMap() we actually have a key of 
> incoming elements so we access state for a specific key, which has default 
> value “0” (from the getKeyValueState() call).
> 
> OperatorState is only useful if the state needs to be partitioned by key, but 
> here it seems that the state is valid for all elements?
>> On 08 Dec 2015, at 15:30, Radu Tudoran <radu.tudo...@huawei.com> wrote:
>> 
>> final StreamExecutionEnvironment env = StreamExecutionEnvironment
>>                              .getExecutionEnvironment();
>> 
>>              DataStream<String> stream = env
>>                              .socketTextStream("localhost", 16333, '\n')
>>                              .map(new MapFunction<String, Tuple1<String>>() {
>>                                      @Override
>>                                      public Tuple1<String> map(String arg0) 
>> throws Exception {
>>                                              return new Tuple1<String>(arg0);
>>                                      }
>>                              }).keyBy(0)
>>                              .flatMap(new 
>> RichFlatMapFunction<Tuple1<String>, String>() {
>> 
>>                                      private OperatorState<Integer> dataset;
>> 
>>                                      @Override
>>                                      public void flatMap(Tuple1<String> arg0,
>>                                                      Collector<String> arg1) 
>> throws Exception {
>> 
>>                                              if (dataset.value() > 0)
>>                                                      arg1.collect("Test OK " 
>> + arg0);
>> 
>>                                              
>>                                              
>>                                      }
>> 
>>                                      @Override
>>                                      public void open(Configuration 
>> parameters) throws Exception {
>> 
>>                                              dataset = 
>> getRuntimeContext().getKeyValueState(
>>                                                              "loadeddata", 
>> Integer.class, 0);
>> 
>>                                              
>>                                               /*
>>                                                * Simulate loading data
>>                                                * Looks like if this part is  
>> commented out and the dataset is 
>>                                                * initialize with 1 for 
>> example, than the non-zero value is available 
>>                                                * in the flatMap function  
>>                                                */
>>                                                
>>                                                for(int i=0;i<10;i++) {
>>                                                        
>> dataset.update(dataset.value()+1);
>>                                                }
>>                                                
>>                                                //System.out.println("dataset 
>> value "+dataset.value());
>>                                                
>>                                      }
>>                              });
>> 
>>              stream.print();
>> 
>>              env.execute("test open function");
> 

Reply via email to