I tried to fix the small mistake of sample code in State-Processor-API doc[1], 
could someone do a doc review[2] for me, thank you.


1: 
https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/state_processor_api.html#keyed-state
2: https://github.com/apache/flink/pull/13266





At 2020-01-21 15:54:56, "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> wrote:
>Hi Izual,
>
>Thanks for reporting this! I'm also forwarding this to the user mailing
>list, as that is the more suitable place for this question.
>
>I think the usability of the State Processor API in Scala is indeed
>something that hasn’t been looked at closely yet.
>
>On Tue, Jan 21, 2020 at 8:12 AM izual <izual...@163.com> wrote:
>
>> Hi community,
>>
>> When I use state in Scala, something makes confused, I followed these
>> steps to generate and read states:
>>
>> a. implements the example[1] `CountWindowAverage` in Scala(exactly same),
>> and run jobA => that makes good.
>>
>> b. execute `flink cancel -s ${JobID}` => savepoints was generated as
>> expected.
>>
>> c. implements the example[2] `StatefulFunctionWithTime` in Scala(code
>> below), and run jobB => failed, exceptions shows that "Caused by:
>> org.apache.flink.util.StateMigrationException: The new key serializer must
>> be compatible."
>>
>>
>> ReaderFunction code as below:
>>
>> ```
>>
>>   class ReaderFunction extends KeyedStateReaderFunction[Long, (Long,
>> Long)] {
>>
>>     var countState: ValueState[(Long, Long)] = _
>>
>>     override def open(parameters: Configuration): Unit = {
>>
>>       val stateDescriptor = new ValueStateDescriptor("average",
>> createTypeInformation[(Long, Long)])
>>
>>       countState = getRuntimeContext().getState(stateDescriptor)
>>
>>     }
>>
>>     override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context,
>> out: Collector[(Long, Long)]): Unit = {
>>
>>       out.collect(countState.value())
>>
>>     }
>>
>>   }
>>
>> ```
>>
>> d. then I try to use java.lang.Long instead of Long in key-type, and run
>> jobB => exception just disappeared and that makes good.
>>
>> This makes me confused. Did I miss some features in State-Processing-API,
>> such as `magic-implicits`?
>>
>
>This part is explainable. The "magic-implicits" actually happen in the
>DataStream Scala API.
>Any primitive Scala types will inferred and serialized as their Java
>counterparts.
>AFAIK, this would not happen in the State Processor API yet and therefore
>why you are getting the StateMigrationException.
>When using Scala types directly with the State Processor API, I would guess
>that Kryo (as a generic fallback) was being used to access state.
>This can probably be confirmed by looking at the exception stack trace. Can
>you post a full copy of that?
>
>This should be resolvable by properly supporting Scala for the State
>Processor API, but it's just that up to this point, we didn't have a plan
>for that yet.
>Can you open a JIRA for this? I think it'll be a reasonable extension to
>the API.
>
>
>>
>> And when I change `xxx.keyBy(_._1)` to `xxx.keyBy(0)`,same exception comes
>> again,this time I tried to use Tuple(java.lang.Long) or something else, but
>> does not work.
>>
>
>I'm not sure what you mean here. Where is this keyBy happening? In the
>Scala DataStream job, or the State Processor API?
>
>
>>
>> Hope.
>>
>> 1:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
>>
>> 2:
>> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state
>
>
>Cheers,
>Gordon

Reply via email to