Hi Robert,

But I saw Flink doc shows application mode can run multiple jobs? Or I
misunderstand it?
https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/



*Compared to the Per-Job mode, the Application Mode allows the
submission of applications consisting of multiple jobs. The order of
job execution is not affected by the deployment mode but by the call
used to launch the job. Using execute(), which is blocking,
establishes an order and it will lead to the execution of the "next"
job being postponed until "this" job finishes. Using executeAsync(),
which is non-blocking, will lead to the "next" job starting before
"this" job finishes.*


On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger <rmetz...@apache.org> wrote:

> Hi Qihua,
>
> Application Mode is meant for executing one job at a time, not multiple
> jobs on the same JobManager.
> If you want to do that, you need to use session mode, which allows
> managing multiple jobs on the same JobManager.
>
> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang <yang...@gmail.com> wrote:
>
>> Hi Arvid,
>>
>> Do you know if I can start multiple jobs for a single flink application?
>>
>> Thanks,
>> Qihua
>>
>>
>> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang <yang...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I am using application mode.
>>>
>>> Thanks,
>>> Qihua
>>>
>>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Hi Qihua,
>>>>
>>>> Which execution mode are you using?
>>>>
>>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <yang...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>>
>>>>> Thank you for your reply. What I want is flink app has multiple jobs,
>>>>> each job manage a stream. Currently our flink app has only 1 job that
>>>>> manage multiple streams.
>>>>> I did try env.executeAsyc(), but it still doesn't work. From the log,
>>>>> when the second executeAsync() was called, it shows " *Job
>>>>> 00000000000000000000000000000000 was recovered successfully.*"
>>>>> Looks like the second executeAsync() recover the first job. Not start
>>>>> a second job.
>>>>>
>>>>> Thanks,
>>>>> Qihua
>>>>>
>>>>>
>>>>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <ar...@apache.org> wrote:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> env.execute("Job 1"); is a blocking call. You either have to use
>>>>>> executeAsync or use a separate thread to submit the second job. If Job 1
>>>>>> finishes then this would also work by having sequential execution.
>>>>>>
>>>>>> However, I think what you actually want to do is to use the same env
>>>>>> with 2 topologies and 1 single execute like this.
>>>>>>
>>>>>> StreamExecutionEnvironment env =
>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>> DataStream<String> stream1 = env.addSource(new
>>>>>> SourceFunction<String>());
>>>>>> stream1.addSink(new FlinkKafkaProducer());
>>>>>> DataStream<String> stream2 = env.addSource(new
>>>>>> SourceFunction<String>());
>>>>>> stream2.addSink(new DiscardingSink<>());
>>>>>> env.execute("Job 1+2");
>>>>>>
>>>>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com> wrote:
>>>>>>
>>>>>>> Hi,
>>>>>>> Does anyone know how to run multiple jobs in same flink application?
>>>>>>> I did a simple test.  First job was started. I did see the log
>>>>>>> message, but I didn't see the second job was started, even I saw the log
>>>>>>> message.
>>>>>>>
>>>>>>> public void testJobs() throws Exception {
>>>>>>> StreamExecutionEnvironment env =
>>>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>> DataStream<String> stream1 = env.addSource(new
>>>>>>> SourceFunction<String>());
>>>>>>> stream1.addSink(new FlinkKafkaProducer());
>>>>>>> printf("### first job");
>>>>>>> env.execute("Job 1");
>>>>>>>
>>>>>>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>>>> DataStream<String> stream2 = env.addSource(new
>>>>>>> SourceFunction<String>());
>>>>>>> stream2.addSink(new DiscardingSink<>());
>>>>>>> printf("### second job");
>>>>>>>     env.execute("Job 2");
>>>>>>> }
>>>>>>>
>>>>>>> Here is the log:
>>>>>>> ### first job
>>>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>>>>>> Job 00000000000000000000000000000000 is submitted.
>>>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>>>>>> Submitting Job with JobId=00000000000000000000000000000000.
>>>>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>>>>> Received JobGraph submission 00000000000000000000000000000000 (job1).
>>>>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>>>>> Submitting job 00000000000000000000000000000000 (job1).
>>>>>>>
>>>>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>>>>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>>>>> execution of job job1 (00000000000000000000000000000000) under job 
>>>>>>> master
>>>>>>> id b03cde9dc2aebdb39c46cda4c2a94c07.
>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>>>>> scheduling with scheduling strategy
>>>>>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
>>>>>>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job
>>>>>>> job1 (00000000000000000000000000000000) switched from state CREATED to
>>>>>>> RUNNING.
>>>>>>>
>>>>>>> ### second job
>>>>>>> WARN  com.doordash.flink.common.utils.LoggerUtil  - Class -
>>>>>>> IndexWriter : ### second job
>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
>>>>>>> ResourceManager address, beginning registration
>>>>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>>>>> Starting ZooKeeperLeaderRetrievalService
>>>>>>> /leader/00000000000000000000000000000000/job_manager_lock.
>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>>> Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
>>>>>>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>>>>>>> 00000000000000000000000000000000.
>>>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>>>>>> Job 00000000000000000000000000000000 was recovered successfully.
>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>>> Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
>>>>>>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>>>>>>> 00000000000000000000000000000000.
>>>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager
>>>>>>> successfully registered at ResourceManager, leader id:
>>>>>>> 956d4431ca90d45d92c027046cd0404e.
>>>>>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>>>>>> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] 
>>>>>>> and
>>>>>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>>> Request slot with profile ResourceProfile{UNKNOWN} for job
>>>>>>> 00000000000000000000000000000000 with allocation id
>>>>>>> 21134414fc60d4ef3e940609cef960b6.
>>>>>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>>>>>> Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] 
>>>>>>> and
>>>>>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>>>> Request slot with profile ResourceProfile{UNKNOWN} for job
>>>>>>> 00000000000000000000000000000000 with allocation id
>>>>>>> 650bd9100a35ef5086fd4614f5253b55.
>>>>>>>
>>>>>>

Reply via email to