Hello,

[Apologies if this group does not answer questions related to AIFlow
project and happy to learn if there are other email handles I need to send
my questions to]

I am new to AIFlow and exploring some demo projects for a simple workflow I
want to try with two flink jobs, a batch (bounded processor) and a stream
job (unbounded). The processors are written in java.

I wrote a simple workflow that uses ai_flow.action_on_job_status API to
chain the stream job to run after the batch job is finished. What I found
however was that the stream job gets submitted immediately after
successfully submitting the batch job and not after the batch job finishes.
>From a cursory look at the code in flink_job_plugin.py, it is not obvious
where and whether the job_id generated from the submission
<https://github.com/flink-extended/ai-flow/blob/master/ai_flow_plugins/job_plugins/flink/flink_job_plugin.py#L196>gets
used to track the job status at all.

Core parts of my workflow:

    af.init_ai_flow_context()
    with af.job_config('flink_batch'):

af.user_define_operation(processor=FlinkJavaProcessor(entry_class="com.linkedin.flink.example.TableApiExample",

main_jar_file=jar_filename,

args=["true"]))
    with af.job_config('flink_stream'):

af.user_define_operation(processor=FlinkJavaProcessor(entry_class="com.linkedin.flink.
example.TableApiExample",

main_jar_file=jar_filename,

args=["false"]))

    af.action_on_job_status('flink_stream', 'flink_batch')


Is this the right way to go about working with the Flink Java processor? I
could not find much documentation on this and would appreciate any inputs
on the right APIs to use.


-- 
Regards,
Deepthi

Reply via email to