Hi Qihua,

Which execution mode are you using?

On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang <yang...@gmail.com> wrote:

> Hi,
>
> Thank you for your reply. What I want is flink app has multiple jobs, each
> job manage a stream. Currently our flink app has only 1 job that manage
> multiple streams.
> I did try env.executeAsyc(), but it still doesn't work. From the log, when
> the second executeAsync() was called, it shows " *Job
> 00000000000000000000000000000000 was recovered successfully.*"
> Looks like the second executeAsync() recover the first job. Not start a
> second job.
>
> Thanks,
> Qihua
>
>
> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise <ar...@apache.org> wrote:
>
>> Hi,
>>
>> env.execute("Job 1"); is a blocking call. You either have to use
>> executeAsync or use a separate thread to submit the second job. If Job 1
>> finishes then this would also work by having sequential execution.
>>
>> However, I think what you actually want to do is to use the same env with
>> 2 topologies and 1 single execute like this.
>>
>> StreamExecutionEnvironment env =
>> StreamExecutionEnvironment.getExecutionEnvironment();
>> DataStream<String> stream1 = env.addSource(new SourceFunction<String>());
>> stream1.addSink(new FlinkKafkaProducer());
>> DataStream<String> stream2 = env.addSource(new SourceFunction<String>());
>> stream2.addSink(new DiscardingSink<>());
>> env.execute("Job 1+2");
>>
>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang <yang...@gmail.com> wrote:
>>
>>> Hi,
>>> Does anyone know how to run multiple jobs in same flink application?
>>> I did a simple test.  First job was started. I did see the log message,
>>> but I didn't see the second job was started, even I saw the log message.
>>>
>>> public void testJobs() throws Exception {
>>> StreamExecutionEnvironment env =
>>> StreamExecutionEnvironment.getExecutionEnvironment();
>>> DataStream<String> stream1 = env.addSource(new SourceFunction<String>());
>>> stream1.addSink(new FlinkKafkaProducer());
>>> printf("### first job");
>>> env.execute("Job 1");
>>>
>>> env = StreamExecutionEnvironment.getExecutionEnvironment();
>>> DataStream<String> stream2 = env.addSource(new SourceFunction<String>());
>>> stream2.addSink(new DiscardingSink<>());
>>> printf("### second job");
>>>     env.execute("Job 2");
>>> }
>>>
>>> Here is the log:
>>> ### first job
>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
>>> 00000000000000000000000000000000 is submitted.
>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  -
>>> Submitting Job with JobId=00000000000000000000000000000000.
>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>> Received JobGraph submission 00000000000000000000000000000000 (job1).
>>> INFO  org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
>>> Submitting job 00000000000000000000000000000000 (job1).
>>>
>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock.
>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting execution
>>> of job job1 (00000000000000000000000000000000) under job master id
>>> b03cde9dc2aebdb39c46cda4c2a94c07.
>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Starting
>>> scheduling with scheduling strategy
>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy]
>>> INFO  org.apache.flink.runtime.executiongraph.ExecutionGraph  - Job job1
>>> (00000000000000000000000000000000) switched from state CREATED to RUNNING.
>>>
>>> ### second job
>>> WARN  com.doordash.flink.common.utils.LoggerUtil  - Class - IndexWriter
>>> : ### second job
>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - Resolved
>>> ResourceManager address, beginning registration
>>> INFO  o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService  -
>>> Starting ZooKeeperLeaderRetrievalService
>>> /leader/00000000000000000000000000000000/job_manager_lock.
>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
>>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>>> 00000000000000000000000000000000.
>>> INFO  o.a.f.c.deployment.application.executors.EmbeddedExecutor  - Job
>>> 00000000000000000000000000000000 was recovered successfully.
>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp://
>>> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job
>>> 00000000000000000000000000000000.
>>> INFO  org.apache.flink.runtime.jobmaster.JobMaster  - JobManager
>>> successfully registered at ResourceManager, leader id:
>>> 956d4431ca90d45d92c027046cd0404e.
>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>> Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and
>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Request slot with profile ResourceProfile{UNKNOWN} for job
>>> 00000000000000000000000000000000 with allocation id
>>> 21134414fc60d4ef3e940609cef960b6.
>>> INFO  org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl  -
>>> Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and
>>> profile ResourceProfile{UNKNOWN} from resource manager.
>>> INFO  o.a.flink.runtime.resourcemanager.StandaloneResourceManager  -
>>> Request slot with profile ResourceProfile{UNKNOWN} for job
>>> 00000000000000000000000000000000 with allocation id
>>> 650bd9100a35ef5086fd4614f5253b55.
>>>
>>

Reply via email to