Depends on how complex the DoFn is, but you should be able to share parts of the implementation in a static method that both implementations invoke.
On Mon, Nov 12, 2018 at 4:59 PM Dmitry Minaev <mina...@gmail.com> wrote: > Yes, sure, that'll work, I will just have to support 2 different > implementations. I was hoping there is something more elegant. > Thank you Lukasz, I appreciate the response! > > -- > Dmitry > > On Mon, Nov 12, 2018 at 2:02 PM Lukasz Cwik <lc...@google.com> wrote: > >> Could you write two different implementations of the DoFn and put your >> processing logic in another function that both DoFn's would invoke after >> doing any accessing of the state? >> >> Then during pipeline construction you could choose to apply the Map one >> or the Value one based upon which runner your using. >> >> >> >> On Mon, Nov 12, 2018 at 10:43 AM Dmitry Minaev <mina...@gmail.com> wrote: >> >>> Hi everyone, >>> >>> Since Dataflow doesn't support MapState ( >>> https://issues.apache.org/jira/browse/BEAM-1474) I'm thinking of using >>> ValueState with a Map<> inside it. Is it a good idea? Here is an example >>> code: >>> ``` >>> @StateId("myValueStore") >>> private final StateSpec<ValueState<Map<String, String>>> valueStoreSpec >>> = StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of())); >>> >>> @ProcessElement >>> public void processElement( ProcessContext >>> context, @StateId("myValueStore") MapState<String, String> valueStore) { >>> ... >>> } >>> ``` >>> >>> I'd like to support other runners as well (e.g. FlinkRunner) and it >>> seems to be more efficient to use MapState in cases where I need to store >>> a map of values. So I'm thinking of the way to use MapState and ValueState >>> for different runners. >>> >>> I understand how to get a runner in runtime via pipeline options but I'm >>> not sure how to approach defining (and using) different StateSpec for >>> different runners. >>> >>> Here is a sample code for MapState: >>> ``` >>> @StateId("myMapStore") >>> private final StateSpec<MapState<String, String>> mapStoreSpec = >>> StateSpecs.map(StringUtf8Coder.of(), StringUtf8Coder.of()); >>> >>> @ProcessElement >>> public void processElement( ProcessContext context, >>> @StateId("myMapStore") MapState<String, String> mapStore) { >>> ... >>> } >>> ``` >>> >>> Any ideas? >>> >>> Thank you, >>> Dmitry >>> >>