Depends on how complex the DoFn is, but you should be able to share parts
of the implementation in a static method that both implementations invoke.

On Mon, Nov 12, 2018 at 4:59 PM Dmitry Minaev <mina...@gmail.com> wrote:

> Yes, sure, that'll work, I will just have to support 2 different
> implementations. I was hoping there is something more elegant.
> Thank you Lukasz, I appreciate the response!
>
> --
> Dmitry
>
> On Mon, Nov 12, 2018 at 2:02 PM Lukasz Cwik <lc...@google.com> wrote:
>
>> Could you write two different implementations of the DoFn and put your
>> processing logic in another function that both DoFn's would invoke after
>> doing any accessing of the state?
>>
>> Then during pipeline construction you could choose to apply the Map one
>> or the Value one based upon which runner your using.
>>
>>
>>
>> On Mon, Nov 12, 2018 at 10:43 AM Dmitry Minaev <mina...@gmail.com> wrote:
>>
>>> Hi everyone,
>>>
>>> Since Dataflow doesn't support MapState (
>>> https://issues.apache.org/jira/browse/BEAM-1474) I'm thinking of using
>>> ValueState with a Map<> inside it. Is it a good idea? Here is an example
>>> code:
>>> ```
>>> @StateId("myValueStore")
>>> private final StateSpec<ValueState<Map<String, String>>> valueStoreSpec
>>> = StateSpecs.value(MapCoder.of(StringUtf8Coder.of(), StringUtf8Coder.of()));
>>>
>>> @ProcessElement
>>> public void processElement( ProcessContext
>>> context, @StateId("myValueStore") MapState<String, String> valueStore) {
>>>     ...
>>> }
>>> ```
>>>
>>> I'd like to support other runners as well (e.g. FlinkRunner) and it
>>> seems to be more efficient to use MapState in cases where I need to store
>>> a map of values. So I'm thinking of the way to use MapState and ValueState
>>> for different runners.
>>>
>>> I understand how to get a runner in runtime via pipeline options but I'm
>>> not sure how to approach defining (and using) different StateSpec for
>>> different runners.
>>>
>>> Here is a sample code for MapState:
>>> ```
>>> @StateId("myMapStore")
>>> private final StateSpec<MapState<String, String>> mapStoreSpec =
>>> StateSpecs.map(StringUtf8Coder.of(), StringUtf8Coder.of());
>>>
>>> @ProcessElement
>>> public void processElement( ProcessContext context,
>>>         @StateId("myMapStore") MapState<String, String> mapStore) {
>>>     ...
>>> }
>>> ```
>>>
>>> Any ideas?
>>>
>>> Thank you,
>>> Dmitry
>>>
>>

Reply via email to