I tried to fix the small mistake of sample code in State-Processor-API doc[1], could someone do a doc review[2] for me, thank you.
1: https://ci.apache.org/projects/flink/flink-docs-master/dev/libs/state_processor_api.html#keyed-state 2: https://github.com/apache/flink/pull/13266 At 2020-01-21 15:54:56, "Tzu-Li (Gordon) Tai" <tzuli...@apache.org> wrote: >Hi Izual, > >Thanks for reporting this! I'm also forwarding this to the user mailing >list, as that is the more suitable place for this question. > >I think the usability of the State Processor API in Scala is indeed >something that hasn’t been looked at closely yet. > >On Tue, Jan 21, 2020 at 8:12 AM izual <izual...@163.com> wrote: > >> Hi community, >> >> When I use state in Scala, something makes confused, I followed these >> steps to generate and read states: >> >> a. implements the example[1] `CountWindowAverage` in Scala(exactly same), >> and run jobA => that makes good. >> >> b. execute `flink cancel -s ${JobID}` => savepoints was generated as >> expected. >> >> c. implements the example[2] `StatefulFunctionWithTime` in Scala(code >> below), and run jobB => failed, exceptions shows that "Caused by: >> org.apache.flink.util.StateMigrationException: The new key serializer must >> be compatible." >> >> >> ReaderFunction code as below: >> >> ``` >> >> class ReaderFunction extends KeyedStateReaderFunction[Long, (Long, >> Long)] { >> >> var countState: ValueState[(Long, Long)] = _ >> >> override def open(parameters: Configuration): Unit = { >> >> val stateDescriptor = new ValueStateDescriptor("average", >> createTypeInformation[(Long, Long)]) >> >> countState = getRuntimeContext().getState(stateDescriptor) >> >> } >> >> override def readKey(key: Long, ctx: KeyedStateReaderFunction.Context, >> out: Collector[(Long, Long)]): Unit = { >> >> out.collect(countState.value()) >> >> } >> >> } >> >> ``` >> >> d. then I try to use java.lang.Long instead of Long in key-type, and run >> jobB => exception just disappeared and that makes good. >> >> This makes me confused. Did I miss some features in State-Processing-API, >> such as `magic-implicits`? >> > >This part is explainable. The "magic-implicits" actually happen in the >DataStream Scala API. >Any primitive Scala types will inferred and serialized as their Java >counterparts. >AFAIK, this would not happen in the State Processor API yet and therefore >why you are getting the StateMigrationException. >When using Scala types directly with the State Processor API, I would guess >that Kryo (as a generic fallback) was being used to access state. >This can probably be confirmed by looking at the exception stack trace. Can >you post a full copy of that? > >This should be resolvable by properly supporting Scala for the State >Processor API, but it's just that up to this point, we didn't have a plan >for that yet. >Can you open a JIRA for this? I think it'll be a reasonable extension to >the API. > > >> >> And when I change `xxx.keyBy(_._1)` to `xxx.keyBy(0)`,same exception comes >> again,this time I tried to use Tuple(java.lang.Long) or something else, but >> does not work. >> > >I'm not sure what you mean here. Where is this keyBy happening? In the >Scala DataStream job, or the State Processor API? > > >> >> Hope. >> >> 1: >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state >> >> 2: >> https://ci.apache.org/projects/flink/flink-docs-release-1.9/dev/stream/state/state.html#using-managed-keyed-state > > >Cheers, >Gordon