Hi Arvid,

Do you know if I can start multiple jobs for a single flink application?

Thanks,
Qihua


On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang <yang...@gmail.com> wrote:

> Hi,
>
> I am using application mode.
>
> Thanks,
> Qihua
>
> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi Qihua,
>>
>> Which execution mode are you using?
>>
>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <yang...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> Thank you for your reply. What I want is flink app has multiple jobs,
>>> each job manage a stream. Currently our flink app has only 1 job that
>>> manage multiple streams.
>>> I did try env.executeAsyc(), but it still doesn't work. From the log,
>>> when the second executeAsync() was called, it shows " *Job
>>> 00000000000000000000000000000000 was recovered successfully.*"
>>> Looks like the second executeAsync() recover the first job. Not start a
>>> second job.
>>>
>>> Thanks,
>>> Qihua
>>>
>>>
>>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <ar...@apache.org> wrote:
>>>
>>>> Hi,
>>>>
>>>> env.execute("Job 1"); is a blocking call. You either have to use
>>>> executeAsync or use a separate thread to submit the second job. If Job 1
>>>> finishes then this would also work by having sequential execution.
>>>>
>>>> However, I think what you actually want to do is to use the same env
>>>> with 2 topologies and 1 single execute like this.
>>>>
>>>> StreamExecutionEnvironment env =
>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>> DataStream<String> stream1 = env.addSource(new
>>>> SourceFunction<String>());
>>>> stream1.addSink(new FlinkKafkaProducer());
>>>> DataStream<String> stream2 = env.addSource(new
>>>> SourceFunction<String>());
>>>> stream2.addSink(new DiscardingSink<>());
>>>> env.execute("Job 1+2");
>>>>
>>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com> wrote:
>>>>
>>>>> Hi,
>>>>> Does anyone know how to run multiple jobs in same flink application?
>>>>> I did a simple test.  First job was started. I did see the log
>>>>> message, but I didn't see the second job was started, even I saw the log
>>>>> message.
>>>>>
>>>>> public void testJobs() throws Exception {
>>>>> StreamExecutionEnvironment env =
>>>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> DataStream<String> stream1 = env.addSource(new
>>>>> SourceFunction<String>());
>>>>> stream1.addSink(new FlinkKafkaProducer());
>>>>> printf("### first job");
>>>>> env.execute("Job 1");
>>>>>
>>>>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>>>>> DataStream<String> stream2 = env.addSource(new
>>>>> SourceFunction<String>());
>>>>> stream2.addSink(new DiscardingSink<>());
>>>>> printf("### second job");
>>>>>     env.execute("Job 2");
>>>>> }
>>>>>
>>>>> Here is the log:
>>>>> ### first job
>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
>>>>> 00000000000000000000000000000000 is submitted.
>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>>>> Submitting Job with JobId=00000000000000000000000000000000.
>>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>>> Received JobGraph submission 00000000000000000000000000000000 (job1).
>>>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>>>> Submitting job 00000000000000000000000000000000 (job1).
>>>>>
>>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>>> execution of job job1 (00000000000000000000000000000000) under job master
>>>>> id b03cde9dc2aebdb39c46cda4c2a94c07.
>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>>>> scheduling with scheduling strategy
>>>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
>>>>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job
>>>>> job1 (00000000000000000000000000000000) switched from state CREATED to
>>>>> RUNNING.
>>>>>
>>>>> ### second job
>>>>> WARN  com.doordash.flink.common.utils.LoggerUtil  - Class -
>>>>> IndexWriter : ### second job
>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
>>>>> ResourceManager address, beginning registration
>>>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>>>> Starting ZooKeeperLeaderRetrievalService
>>>>> /leader/00000000000000000000000000000000/job_manager_lock.
>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
>>>>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>>>>> 00000000000000000000000000000000.
>>>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
>>>>> 00000000000000000000000000000000 was recovered successfully.
>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
>>>>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>>>>> 00000000000000000000000000000000.
>>>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager
>>>>> successfully registered at ResourceManager, leader id:
>>>>> 956d4431ca90d45d92c027046cd0404e.
>>>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>>>> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and
>>>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Request slot with profile ResourceProfile{UNKNOWN} for job
>>>>> 00000000000000000000000000000000 with allocation id
>>>>> 21134414fc60d4ef3e940609cef960b6.
>>>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>>>> Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and
>>>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>>>> Request slot with profile ResourceProfile{UNKNOWN} for job
>>>>> 00000000000000000000000000000000 with allocation id
>>>>> 650bd9100a35ef5086fd4614f5253b55.
>>>>>
>>>>

Reply via email to