>>
>> Yes, it should be possible to register a timer for Long.MAX_WATERMARK if
>> you want to apply a transformation at the end of each key. You could
>> also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode.
>
> According to [0], timer time is irrelevant since timer will be triggered
> at the end of time right? If that is the case, we can use the same code
> for both streaming and batch mode.

Yes, timers will fire regardless of it's value. However what I
believe Dawid meant, is that if you pick a value not very far from the
future, you are risking that the timer will fire while your job is still
running. Picking MAX_WATERMARK would prevent that from happening.

> Currently, we want to use batch execution mode [0] and historical data
> to build state for our streaming application
(...)
> We hope that in this way, we can rebuild our states with almost the same
code in streaming.

If that's your main purpose, you can also consider using State Processor
API [1] to bootstrap the state of your job. That's after the main purpose
of the State Processor API.

Piotrek

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/libs/state_processor_api/

śr., 26 maj 2021 o 14:04 ChangZhuo Chen (陳昌倬) <czc...@czchen.org>
napisał(a):

> On Wed, May 26, 2021 at 01:03:53PM +0200, Dawid Wysakowicz wrote:
> > Hi,
> >
> > No there is no API in the operator to know which mode it works in. We
> > aim to have separate operators for both modes if required. You can check
> > e.g. how we do it in KeyedBroadcastStateTransformationTranslator[1].
>
> Thanks for the information. We implement this according to Piotrek's
> suggestion.
>
> >
> > Yes, it should be possible to register a timer for Long.MAX_WATERMARK if
> > you want to apply a transformation at the end of each key. You could
> > also use the reduce operation (DataStream#keyBy#reduce) in BATCH mode.
>
> According to [0], timer time is irrelevant since timer will be triggered
> at the end of time right? If that is the case, we can use the same code
> for both streaming and batch mode.
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
>
>
> >
> > A side note, I don't fully get what you mean by "build state for our
> > streaming application". Bear in mind though you cannot take a savepoint
> > from a job running in the BATCH execution mode. Moreover it uses a
> > different kind of StateBackend. Actually a dummy one, which just
> > imitates a real state backend.
>
> What we plan to do here is:
>
> 1. Load configuration from broadcast event (custom source backed by REST
>    API).
> 2. Load historical events as batch mode input (From GCS).
> 3. Use timer to trigger output so that the following will happen:
>    a. Serialize keyed states into JSON.
>    b. Output to Kafka.
>    c. Streaming application consumes data from Kafka, and update its
>       keyed states according to it.
>
> We hope that in this way, we can rebuild our states with almost the same
> code in streaming.
>
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>

Reply via email to