ES sink never receive error code
Hello, We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES by using bulk requests. From ES metrics, we observed some bulk thread pool rejections. Contacted AWS team, their explanation is part of bulk request was rejected. Response body should include status for each item. For bulk thread pool rejection, the error code is 429. Our flink app override FailureHandler to process error cases. I checked Flink code, it has AfterBulk() method to handle item errors. FailureHandler() never received any 429 error. Is that flink issue? Or we need to config something to make it work? Thanks, Qihua
Re: ES sink never receive error code
Thank you for the reply! Yes, we did config bulk.flush.backoff.enable. So, ES BulkProcessor retried after bulk request was partially rejected. And eventually that request was sent successfully? That is why failure handler was not called? Thanks, Qihua On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan wrote: > Hi, > > Have you tried to change bulk.flush.backoff.enable? > According to the docs [1], the underlying ES BulkProcessor will retry > (by default), so the provided failure handler might not be called. > > [1] > > https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor > > Regards, > Roman > > On Thu, May 20, 2021 at 10:08 PM Qihua Yang wrote: > > > > Hello, > > We are using flink-connector-elasticsearch6_2.11 to ingest stream data > to ES by using bulk requests. From ES metrics, we observed some bulk thread > pool rejections. Contacted AWS team, their explanation is part of bulk > request was rejected. Response body should include status for each item. > For bulk thread pool rejection, the error code is 429. > > Our flink app override FailureHandler to process error cases. > > I checked Flink code, it has AfterBulk() method to handle item errors. > FailureHandler() never received any 429 error. > > Is that flink issue? Or we need to config something to make it work? > > Thanks, > > > > Qihua >
Re: ES sink never receive error code
Got it! thanks for helping. On Thu, May 20, 2021 at 7:15 PM Yangze Guo wrote: > > So, ES BulkProcessor retried after bulk request was partially rejected. > And eventually that request was sent successfully? That is why failure > handler was not called? > > If the bulk request fails after the max number of retries > (bulk.flush.backoff.retries), the failure handler will still be > called. > > > Best, > Yangze Guo > > On Fri, May 21, 2021 at 5:53 AM Qihua Yang wrote: > > > > Thank you for the reply! > > Yes, we did config bulk.flush.backoff.enable. > > So, ES BulkProcessor retried after bulk request was partially rejected. > And eventually that request was sent successfully? That is why failure > handler was not called? > > > > Thanks, > > Qihua > > > > On Thu, May 20, 2021 at 2:22 PM Roman Khachatryan > wrote: > >> > >> Hi, > >> > >> Have you tried to change bulk.flush.backoff.enable? > >> According to the docs [1], the underlying ES BulkProcessor will retry > >> (by default), so the provided failure handler might not be called. > >> > >> [1] > >> > https://ci.apache.org/projects/flink/flink-docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor > >> > >> Regards, > >> Roman > >> > >> On Thu, May 20, 2021 at 10:08 PM Qihua Yang wrote: > >> > > >> > Hello, > >> > We are using flink-connector-elasticsearch6_2.11 to ingest stream > data to ES by using bulk requests. From ES metrics, we observed some bulk > thread pool rejections. Contacted AWS team, their explanation is part of > bulk request was rejected. Response body should include status for each > item. For bulk thread pool rejection, the error code is 429. > >> > Our flink app override FailureHandler to process error cases. > >> > I checked Flink code, it has AfterBulk() method to handle item > errors. FailureHandler() never received any 429 error. > >> > Is that flink issue? Or we need to config something to make it work? > >> > Thanks, > >> > > >> > Qihua >
Flink stream processing issue
Hi, I have a question. We have two data streams that may contain duplicate data. We are using keyedCoProcessFunction to process stream data. I defined the same keySelector for both streams. Our flink application has multiple replicas. We want the same data to be processed by the same replica. Can flink ensure that? Thanks, Qihua
Re: Flink stream processing issue
Sorry for the confusion Yes, I mean multiple parallelism. Really thanks for your help. Thanks, Qihua On Thu, Jun 3, 2021 at 12:03 AM JING ZHANG wrote: > Hi Qihua, > > I’m sorry I didn’t understand what you mean by ‘replica’. Would you please > explain a little more? > If you means you job has multiple parallelism, and whether same data from > different two inputs would be send to the same downstream subtask after > `keyedCoProcessFunction`. > Yes, Flink could do this, if you keyBy the same field for two inputs. > > Best regards, > JING ZHANG > > Qihua Yang 于2021年6月3日周四 下午12:25写道: > >> Hi, >> >> I have a question. We have two data streams that may contain duplicate >> data. We are using keyedCoProcessFunction to process stream data. I defined >> the same keySelector for both streams. Our flink application has multiple >> replicas. We want the same data to be processed by the same replica. Can >> flink ensure that? >> >> Thanks, >> Qihua >> >
multiple jobs in same flink app
Hi, Does anyone know how to run multiple jobs in same flink application? I did a simple test. First job was started. I did see the log message, but I didn't see the second job was started, even I saw the log message. public void testJobs() throws Exception { StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream stream1 = env.addSource(new SourceFunction()); stream1.addSink(new FlinkKafkaProducer()); printf("### first job"); env.execute("Job 1"); env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream stream2 = env.addSource(new SourceFunction()); stream2.addSink(new DiscardingSink<>()); printf("### second job"); env.execute("Job 2"); } Here is the log: ### first job INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Job is submitted. INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Submitting Job with JobId=. INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Received JobGraph submission (job1). INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - Submitting job (job1). INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting execution of job job1 () under job master id b03cde9dc2aebdb39c46cda4c2a94c07. INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting scheduling with scheduling strategy [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job job1 () switched from state CREATED to RUNNING. ### second job WARN com.doordash.flink.common.utils.LoggerUtil - Class - IndexWriter : ### second job INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved ResourceManager address, beginning registration INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting ZooKeeperLeaderRetrievalService /leader//job_manager_lock. INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp:// flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job . INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Job was recovered successfully. INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp:// flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job . INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager successfully registered at ResourceManager, leader id: 956d4431ca90d45d92c027046cd0404e. INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Requesting new slot [SlotRequestId{8980dce0c0ef6a933d73051c58534489}] and profile ResourceProfile{UNKNOWN} from resource manager. INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - Request slot with profile ResourceProfile{UNKNOWN} for job with allocation id 21134414fc60d4ef3e940609cef960b6. INFO org.apache.flink.runtime.jobmaster.slotpool.SlotPoolImpl - Requesting new slot [SlotRequestId{e37cc684758e49f3cce76c4687b1d1a3}] and profile ResourceProfile{UNKNOWN} from resource manager. INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - Request slot with profile ResourceProfile{UNKNOWN} for job with allocation id 650bd9100a35ef5086fd4614f5253b55.
Re: multiple jobs in same flink app
Hi, Thank you for your reply. What I want is flink app has multiple jobs, each job manage a stream. Currently our flink app has only 1 job that manage multiple streams. I did try env.executeAsyc(), but it still doesn't work. From the log, when the second executeAsync() was called, it shows " *Job was recovered successfully.*" Looks like the second executeAsync() recover the first job. Not start a second job. Thanks, Qihua On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise wrote: > Hi, > > env.execute("Job 1"); is a blocking call. You either have to use > executeAsync or use a separate thread to submit the second job. If Job 1 > finishes then this would also work by having sequential execution. > > However, I think what you actually want to do is to use the same env with > 2 topologies and 1 single execute like this. > > StreamExecutionEnvironment env = > StreamExecutionEnvironment.getExecutionEnvironment(); > DataStream stream1 = env.addSource(new SourceFunction()); > stream1.addSink(new FlinkKafkaProducer()); > DataStream stream2 = env.addSource(new SourceFunction()); > stream2.addSink(new DiscardingSink<>()); > env.execute("Job 1+2"); > > On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang wrote: > >> Hi, >> Does anyone know how to run multiple jobs in same flink application? >> I did a simple test. First job was started. I did see the log message, >> but I didn't see the second job was started, even I saw the log message. >> >> public void testJobs() throws Exception { >> StreamExecutionEnvironment env = >> StreamExecutionEnvironment.getExecutionEnvironment(); >> DataStream stream1 = env.addSource(new SourceFunction()); >> stream1.addSink(new FlinkKafkaProducer()); >> printf("### first job"); >> env.execute("Job 1"); >> >> env = StreamExecutionEnvironment.getExecutionEnvironment(); >> DataStream stream2 = env.addSource(new SourceFunction()); >> stream2.addSink(new DiscardingSink<>()); >> printf("### second job"); >> env.execute("Job 2"); >> } >> >> Here is the log: >> ### first job >> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Job >> is submitted. >> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - >> Submitting Job with JobId=. >> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >> Received JobGraph submission (job1). >> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >> Submitting job (job1). >> >> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting >> ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. >> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting execution >> of job job1 () under job master id >> b03cde9dc2aebdb39c46cda4c2a94c07. >> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting scheduling >> with scheduling strategy >> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy] >> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job job1 >> () switched from state CREATED to RUNNING. >> >> ### second job >> WARN com.doordash.flink.common.utils.LoggerUtil - Class - IndexWriter : >> ### second job >> INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved >> ResourceManager address, beginning registration >> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - Starting >> ZooKeeperLeaderRetrievalService >> /leader//job_manager_lock. >> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >> Registering job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp:// >> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job >> . >> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Job >> was recovered successfully. >> INFO o.a.flink.runtime.resourcemanager.StandaloneResourceManager - >> Registered job manager b03cde9dc2aebdb39c46cda4c2a94...@akka.tcp:// >> flink@10.4.139.110:6123/user/rpc/jobmanager_2 for job >> . >> INFO org.apache.flink.runtime.jobmaster.JobMaster - JobManager >> successfully registered at ResourceManager, leader id: >> 956d4431ca90d45d92c027046cd0404e. >> INFO org.apache.flink.runtime.jobmaster
Re: multiple jobs in same flink app
Hi, I am using application mode. Thanks, Qihua On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise wrote: > Hi Qihua, > > Which execution mode are you using? > > On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang wrote: > >> Hi, >> >> Thank you for your reply. What I want is flink app has multiple jobs, >> each job manage a stream. Currently our flink app has only 1 job that >> manage multiple streams. >> I did try env.executeAsyc(), but it still doesn't work. From the log, >> when the second executeAsync() was called, it shows " *Job >> was recovered successfully.*" >> Looks like the second executeAsync() recover the first job. Not start a >> second job. >> >> Thanks, >> Qihua >> >> >> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise wrote: >> >>> Hi, >>> >>> env.execute("Job 1"); is a blocking call. You either have to use >>> executeAsync or use a separate thread to submit the second job. If Job 1 >>> finishes then this would also work by having sequential execution. >>> >>> However, I think what you actually want to do is to use the same env >>> with 2 topologies and 1 single execute like this. >>> >>> StreamExecutionEnvironment env = >>> StreamExecutionEnvironment.getExecutionEnvironment(); >>> DataStream stream1 = env.addSource(new SourceFunction()); >>> stream1.addSink(new FlinkKafkaProducer()); >>> DataStream stream2 = env.addSource(new SourceFunction()); >>> stream2.addSink(new DiscardingSink<>()); >>> env.execute("Job 1+2"); >>> >>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang wrote: >>> >>>> Hi, >>>> Does anyone know how to run multiple jobs in same flink application? >>>> I did a simple test. First job was started. I did see the log message, >>>> but I didn't see the second job was started, even I saw the log message. >>>> >>>> public void testJobs() throws Exception { >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> DataStream stream1 = env.addSource(new >>>> SourceFunction()); >>>> stream1.addSink(new FlinkKafkaProducer()); >>>> printf("### first job"); >>>> env.execute("Job 1"); >>>> >>>> env = StreamExecutionEnvironment.getExecutionEnvironment(); >>>> DataStream stream2 = env.addSource(new >>>> SourceFunction()); >>>> stream2.addSink(new DiscardingSink<>()); >>>> printf("### second job"); >>>> env.execute("Job 2"); >>>> } >>>> >>>> Here is the log: >>>> ### first job >>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Job >>>> is submitted. >>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - >>>> Submitting Job with JobId=. >>>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>> Received JobGraph submission (job1). >>>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>> Submitting job (job1). >>>> >>>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. >>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>>> execution of job job1 () under job master >>>> id b03cde9dc2aebdb39c46cda4c2a94c07. >>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>>> scheduling with scheduling strategy >>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy] >>>> INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Job >>>> job1 () switched from state CREATED to >>>> RUNNING. >>>> >>>> ### second job >>>> WARN com.doordash.flink.common.utils.LoggerUtil - Class - IndexWriter >>>> : ### second job >>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Resolved >>>> ResourceManager address, beginning registration >>>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>> Starting Z
Re: multiple jobs in same flink app
Hi Arvid, Do you know if I can start multiple jobs for a single flink application? Thanks, Qihua On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang wrote: > Hi, > > I am using application mode. > > Thanks, > Qihua > > On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise wrote: > >> Hi Qihua, >> >> Which execution mode are you using? >> >> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang wrote: >> >>> Hi, >>> >>> Thank you for your reply. What I want is flink app has multiple jobs, >>> each job manage a stream. Currently our flink app has only 1 job that >>> manage multiple streams. >>> I did try env.executeAsyc(), but it still doesn't work. From the log, >>> when the second executeAsync() was called, it shows " *Job >>> was recovered successfully.*" >>> Looks like the second executeAsync() recover the first job. Not start a >>> second job. >>> >>> Thanks, >>> Qihua >>> >>> >>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise wrote: >>> >>>> Hi, >>>> >>>> env.execute("Job 1"); is a blocking call. You either have to use >>>> executeAsync or use a separate thread to submit the second job. If Job 1 >>>> finishes then this would also work by having sequential execution. >>>> >>>> However, I think what you actually want to do is to use the same env >>>> with 2 topologies and 1 single execute like this. >>>> >>>> StreamExecutionEnvironment env = >>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>> DataStream stream1 = env.addSource(new >>>> SourceFunction()); >>>> stream1.addSink(new FlinkKafkaProducer()); >>>> DataStream stream2 = env.addSource(new >>>> SourceFunction()); >>>> stream2.addSink(new DiscardingSink<>()); >>>> env.execute("Job 1+2"); >>>> >>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang wrote: >>>> >>>>> Hi, >>>>> Does anyone know how to run multiple jobs in same flink application? >>>>> I did a simple test. First job was started. I did see the log >>>>> message, but I didn't see the second job was started, even I saw the log >>>>> message. >>>>> >>>>> public void testJobs() throws Exception { >>>>> StreamExecutionEnvironment env = >>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> DataStream stream1 = env.addSource(new >>>>> SourceFunction()); >>>>> stream1.addSink(new FlinkKafkaProducer()); >>>>> printf("### first job"); >>>>> env.execute("Job 1"); >>>>> >>>>> env = StreamExecutionEnvironment.getExecutionEnvironment(); >>>>> DataStream stream2 = env.addSource(new >>>>> SourceFunction()); >>>>> stream2.addSink(new DiscardingSink<>()); >>>>> printf("### second job"); >>>>> env.execute("Job 2"); >>>>> } >>>>> >>>>> Here is the log: >>>>> ### first job >>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - Job >>>>> is submitted. >>>>> INFO o.a.f.c.deployment.application.executors.EmbeddedExecutor - >>>>> Submitting Job with JobId=. >>>>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>>> Received JobGraph submission (job1). >>>>> INFO org.apache.flink.runtime.dispatcher.StandaloneDispatcher - >>>>> Submitting job (job1). >>>>> >>>>> INFO o.a.f.r.leaderretrieval.ZooKeeperLeaderRetrievalService - >>>>> Starting ZooKeeperLeaderRetrievalService /leader/resource_manager_lock. >>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>>>> execution of job job1 () under job master >>>>> id b03cde9dc2aebdb39c46cda4c2a94c07. >>>>> INFO org.apache.flink.runtime.jobmaster.JobMaster - Starting >>>>> scheduling with scheduling strategy >>>>> [org.apache.flink.runtime.scheduler.strategy.EagerSchedulingStrategy] >>>>> INFO org.apache.flink.runtime.executiongr
Re: multiple jobs in same flink app
Hi Robert, But I saw Flink doc shows application mode can run multiple jobs? Or I misunderstand it? https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/ *Compared to the Per-Job mode, the Application Mode allows the submission of applications consisting of multiple jobs. The order of job execution is not affected by the deployment mode but by the call used to launch the job. Using execute(), which is blocking, establishes an order and it will lead to the execution of the "next" job being postponed until "this" job finishes. Using executeAsync(), which is non-blocking, will lead to the "next" job starting before "this" job finishes.* On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger wrote: > Hi Qihua, > > Application Mode is meant for executing one job at a time, not multiple > jobs on the same JobManager. > If you want to do that, you need to use session mode, which allows > managing multiple jobs on the same JobManager. > > On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang wrote: > >> Hi Arvid, >> >> Do you know if I can start multiple jobs for a single flink application? >> >> Thanks, >> Qihua >> >> >> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang wrote: >> >>> Hi, >>> >>> I am using application mode. >>> >>> Thanks, >>> Qihua >>> >>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise wrote: >>> >>>> Hi Qihua, >>>> >>>> Which execution mode are you using? >>>> >>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang wrote: >>>> >>>>> Hi, >>>>> >>>>> Thank you for your reply. What I want is flink app has multiple jobs, >>>>> each job manage a stream. Currently our flink app has only 1 job that >>>>> manage multiple streams. >>>>> I did try env.executeAsyc(), but it still doesn't work. From the log, >>>>> when the second executeAsync() was called, it shows " *Job >>>>> was recovered successfully.*" >>>>> Looks like the second executeAsync() recover the first job. Not start >>>>> a second job. >>>>> >>>>> Thanks, >>>>> Qihua >>>>> >>>>> >>>>> On Thu, Jun 17, 2021 at 7:43 AM Arvid Heise wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> env.execute("Job 1"); is a blocking call. You either have to use >>>>>> executeAsync or use a separate thread to submit the second job. If Job 1 >>>>>> finishes then this would also work by having sequential execution. >>>>>> >>>>>> However, I think what you actually want to do is to use the same env >>>>>> with 2 topologies and 1 single execute like this. >>>>>> >>>>>> StreamExecutionEnvironment env = >>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>> DataStream stream1 = env.addSource(new >>>>>> SourceFunction()); >>>>>> stream1.addSink(new FlinkKafkaProducer()); >>>>>> DataStream stream2 = env.addSource(new >>>>>> SourceFunction()); >>>>>> stream2.addSink(new DiscardingSink<>()); >>>>>> env.execute("Job 1+2"); >>>>>> >>>>>> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang wrote: >>>>>> >>>>>>> Hi, >>>>>>> Does anyone know how to run multiple jobs in same flink application? >>>>>>> I did a simple test. First job was started. I did see the log >>>>>>> message, but I didn't see the second job was started, even I saw the log >>>>>>> message. >>>>>>> >>>>>>> public void testJobs() throws Exception { >>>>>>> StreamExecutionEnvironment env = >>>>>>> StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>> DataStream stream1 = env.addSource(new >>>>>>> SourceFunction()); >>>>>>> stream1.addSink(new FlinkKafkaProducer()); >>>>>>> printf("### first job"); >>>>>>> env.execute("Job 1"); >>>>>>> >>>>>>> env = StreamExecutionEnvironment.getExecutionEnvironment(); >>>>>>> DataStream stream2 = env.addSource(new >>>&
Re: multiple jobs in same flink app
Hi, We are using HA mode. Looks like multiple jobs is not an option for us That makes sense! Thanks for your guys' help! Thanks, Qihua On Wed, Jun 23, 2021 at 7:28 PM Yang Wang wrote: > Robert is right. We Could only support single job submission in > application mode when the HA mode is enabled. > > This is a known limitation of current application mode implementation. > > Best, > Yang > > Robert Metzger 于2021年6月24日周四 上午3:54写道: > >> Thanks a lot for checking again. I just started Flink in Application mode >> with a jar that contains two "executeAsync" submissions, and indeed two >> jobs are running. >> >> I think the problem in your case is that you are using High Availability >> (I guess, because there are log statements from the >> ZooKeeperLeaderRetrievalService). As you can see from the documentation [1]: >> >> The Application Mode allows for multi-execute() applications but >>> High-Availability is not supported in these cases. High-Availability in >>> Application Mode is only supported for single-execute() applications. >> >> >> See also: https://issues.apache.org/jira/browse/FLINK-19358 >> >> Sorry again that I gave you invalid information in my first answer. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/ >> >> >> >> >> On Wed, Jun 23, 2021 at 8:50 AM Qihua Yang wrote: >> >>> Hi Robert, >>> >>> But I saw Flink doc shows application mode can run multiple jobs? Or I >>> misunderstand it? >>> >>> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/deployment/ >>> >>> >>> >>> *Compared to the Per-Job mode, the Application Mode allows the submission >>> of applications consisting of multiple jobs. The order of job execution is >>> not affected by the deployment mode but by the call used to launch the job. >>> Using execute(), which is blocking, establishes an order and it will lead >>> to the execution of the "next" job being postponed until "this" job >>> finishes. Using executeAsync(), which is non-blocking, will lead to the >>> "next" job starting before "this" job finishes.* >>> >>> >>> On Tue, Jun 22, 2021 at 11:13 PM Robert Metzger >>> wrote: >>> >>>> Hi Qihua, >>>> >>>> Application Mode is meant for executing one job at a time, not multiple >>>> jobs on the same JobManager. >>>> If you want to do that, you need to use session mode, which allows >>>> managing multiple jobs on the same JobManager. >>>> >>>> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang wrote: >>>> >>>>> Hi Arvid, >>>>> >>>>> Do you know if I can start multiple jobs for a single flink >>>>> application? >>>>> >>>>> Thanks, >>>>> Qihua >>>>> >>>>> >>>>> On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> I am using application mode. >>>>>> >>>>>> Thanks, >>>>>> Qihua >>>>>> >>>>>> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise >>>>>> wrote: >>>>>> >>>>>>> Hi Qihua, >>>>>>> >>>>>>> Which execution mode are you using? >>>>>>> >>>>>>> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang >>>>>>> wrote: >>>>>>> >>>>>>>> Hi, >>>>>>>> >>>>>>>> Thank you for your reply. What I want is flink app has multiple >>>>>>>> jobs, each job manage a stream. Currently our flink app has only 1 job >>>>>>>> that >>>>>>>> manage multiple streams. >>>>>>>> I did try env.executeAsyc(), but it still doesn't work. From the >>>>>>>> log, when the second executeAsync() was called, it shows " *Job >>>>>>>> was recovered successfully.*" >>>>>>>> Looks like the second executeAsync() recover the first job. Not >>>>>>>> start a second job. >>>>>>>> >>>>>>>> Thanks, >>>>>>>>
Flink run different jars
Hi, Is that possible to run a flink app without a job? What I am trying to do is I build multiple jars. And switch jar to run different jobs. I am not sure if flink supports this mode. I saw rest API can upload jar, cancel job and run a jar. Right now I can upload a jar to flink. But when I cancel a job, flink will restart automatically. I checked log. It show below logs. Can anyone help me out? Caused by: org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: Application Status: CANCELED at org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:73) ... 41 common frames omitted Caused by: org.apache.flink.runtime.client.JobCancellationException: Job was cancelled. at org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:149) at org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:64) ... 41 common frames omitted Thanks!
Re: Flink run different jars
Hi Yangze, Thanks a lot for your reply. References are very helpful! Another quick question. Reference 1 can start a standalone cluster (session Mode). That cluster has a jobManager. I can submit job to run. How about taskManger? Do I need to manually start multiple taskManagers? Is there a complete example to show the process? Thanks, Qihua On Tue, Sep 28, 2021 at 7:02 PM Yangze Guo wrote: > Hi, Qihua > > IIUC, what you want might be a standalone cluster[1] or session > cluster[2][3]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/ > [2] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode > [3] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode > > Best, > Yangze Guo > > On Wed, Sep 29, 2021 at 5:57 AM Qihua Yang wrote: > > > > Hi, > > > > Is that possible to run a flink app without a job? What I am trying to > do is I build multiple jars. And switch jar to run different jobs. > > I am not sure if flink supports this mode. I saw rest API can upload > jar, cancel job and run a jar. > > Right now I can upload a jar to flink. But when I cancel a job, flink > will restart automatically. I checked log. It show below logs. Can anyone > help me out? > > > > Caused by: > org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: > Application Status: CANCELED > > at > org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:73) > > ... 41 common frames omitted > > Caused by: org.apache.flink.runtime.client.JobCancellationException: Job > was cancelled. > > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:149) > > at > org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:64) > > ... 41 common frames omitted > > > > Thanks! >
Start Flink cluster, k8s pod behavior
Hi, I deployed flink in session mode. I didn't run any jobs. I saw below logs. That is normal, same as Flink menual shows. + /opt/flink/bin/run-job-manager.sh Starting HA cluster with 1 masters. Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g. Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g. But when I check kubectl, it shows status is Completed. After a while, status changed to CrashLoopBackOff, and pod restart. NAME READY STATUS RESTARTS AGE job-manager-776dcf6dd-xzs8g 0/1 Completed 5 5m27s NAME READY STATUS RESTARTS AGE job-manager-776dcf6dd-xzs8g 0/1 CrashLoopBackOff 5 7m35s Anyone can help me understand why? Why do kubernetes regard this pod as completed and restart? Should I config something? either Flink side or Kubernetes side? From the Flink manual, after the cluster is started, I can upload a jar to run the application. Thanks, Qihua
Re: Flink run different jars
Thanks a lot Yangze. That is very helpful! On Tue, Sep 28, 2021 at 11:11 PM Yangze Guo wrote: > You need to edit the conf/workers. Example of the config[1] and the > process[2]. > > [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-and-stopping-a-cluster > [2] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-a-standalone-cluster-session-mode > > Best, > Yangze Guo > > On Wed, Sep 29, 2021 at 1:02 PM Qihua Yang wrote: > > > > Hi Yangze, > > > > Thanks a lot for your reply. References are very helpful! > > Another quick question. Reference 1 can start a standalone cluster > (session Mode). That cluster has a jobManager. I can submit job to run. How > about taskManger? Do I need to manually start multiple taskManagers? > > Is there a complete example to show the process? > > > > Thanks, > > Qihua > > > > > > On Tue, Sep 28, 2021 at 7:02 PM Yangze Guo wrote: > >> > >> Hi, Qihua > >> > >> IIUC, what you want might be a standalone cluster[1] or session > cluster[2][3]. > >> > >> [1] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/ > >> [2] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/native_kubernetes/#session-mode > >> [3] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/yarn/#session-mode > >> > >> Best, > >> Yangze Guo > >> > >> On Wed, Sep 29, 2021 at 5:57 AM Qihua Yang wrote: > >> > > >> > Hi, > >> > > >> > Is that possible to run a flink app without a job? What I am trying > to do is I build multiple jars. And switch jar to run different jobs. > >> > I am not sure if flink supports this mode. I saw rest API can upload > jar, cancel job and run a jar. > >> > Right now I can upload a jar to flink. But when I cancel a job, flink > will restart automatically. I checked log. It show below logs. Can anyone > help me out? > >> > > >> > Caused by: > org.apache.flink.client.deployment.application.UnsuccessfulExecutionException: > Application Status: CANCELED > >> > at > org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:73) > >> > ... 41 common frames omitted > >> > Caused by: org.apache.flink.runtime.client.JobCancellationException: > Job was cancelled. > >> > at > org.apache.flink.runtime.jobmaster.JobResult.toJobExecutionResult(JobResult.java:149) > >> > at > org.apache.flink.client.deployment.application.UnsuccessfulExecutionException.fromJobResult(UnsuccessfulExecutionException.java:64) > >> > ... 41 common frames omitted > >> > > >> > Thanks! >
Re: Start Flink cluster, k8s pod behavior
I did check the kubectl describe, it shows below info. Reason is Completed. Ports: 8081/TCP, 6123/TCP, 6124/TCP, 6125/TCP Host Ports:0/TCP, 0/TCP, 0/TCP, 0/TCP Command: /opt/flink/bin/entrypoint.sh Args: /opt/flink/bin/run-job-manager.sh State: Waiting Reason: CrashLoopBackOff Last State: Terminated Reason: Completed Exit Code:0 Started: Wed, 29 Sep 2021 20:12:30 -0700 Finished: Wed, 29 Sep 2021 20:12:45 -0700 Ready: False Restart Count: 131 On Wed, Sep 29, 2021 at 11:59 PM Matthias Pohl wrote: > Hi Qihua, > I guess, looking into kubectl describe and the JobManager logs would help > in understanding what's going on. > > Best, > Matthias > > On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang wrote: > >> Hi, >> I deployed flink in session mode. I didn't run any jobs. I saw below >> logs. That is normal, same as Flink menual shows. >> >> + /opt/flink/bin/run-job-manager.sh >> Starting HA cluster with 1 masters. >> Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g. >> Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g. >> >> But when I check kubectl, it shows status is Completed. After a while, >> status changed to CrashLoopBackOff, and pod restart. >> NAME READY >> STATUS RESTARTS AGE >> job-manager-776dcf6dd-xzs8g 0/1 Completed 5 >> 5m27s >> >> NAME READY >> STATUS RESTARTS AGE >> job-manager-776dcf6dd-xzs8g 0/1 CrashLoopBackOff 5 >> 7m35s >> >> Anyone can help me understand why? >> Why do kubernetes regard this pod as completed and restart? Should I >> config something? either Flink side or Kubernetes side? From the Flink >> manual, after the cluster is started, I can upload a jar to run the >> application. >> >> Thanks, >> Qihua >> >
Re: Start Flink cluster, k8s pod behavior
Thank you for your reply. >From the log, exit code is 0, and reason is Completed. Looks like the cluster is fine. But why kubenetes restart the pod. As you said, from perspective of Kubernetes everything is done. Then how to prevent the restart? It didn't even give chance to upload and run a jar Ports: 8081/TCP, 6123/TCP, 6124/TCP, 6125/TCP Host Ports:0/TCP, 0/TCP, 0/TCP, 0/TCP Command: /opt/flink/bin/entrypoint.sh Args: /opt/flink/bin/run-job-manager.sh State: Waiting Reason: CrashLoopBackOff Last State: Terminated Reason: Completed Exit Code:0 Started: Wed, 29 Sep 2021 20:12:30 -0700 Finished: Wed, 29 Sep 2021 20:12:45 -0700 Ready: False Restart Count: 131 Thanks, Qihua On Thu, Sep 30, 2021 at 1:00 AM Chesnay Schepler wrote: > Is the run-job-manager.sh script actually blocking? > Since you (apparently) use that as an entrypoint, if that scripts exits > after starting the JM then from the perspective of Kubernetes everything is > done. > > On 30/09/2021 08:59, Matthias Pohl wrote: > > Hi Qihua, > I guess, looking into kubectl describe and the JobManager logs would help > in understanding what's going on. > > Best, > Matthias > > On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang wrote: > >> Hi, >> I deployed flink in session mode. I didn't run any jobs. I saw below >> logs. That is normal, same as Flink menual shows. >> >> + /opt/flink/bin/run-job-manager.sh >> Starting HA cluster with 1 masters. >> Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g. >> Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g. >> >> >> But when I check kubectl, it shows status is Completed. After a while, >> status changed to CrashLoopBackOff, and pod restart. >> NAME READY >> STATUS RESTARTS AGE >> job-manager-776dcf6dd-xzs8g 0/1 Completed 5 >> 5m27s >> >> NAME READY >> STATUS RESTARTS AGE >> job-manager-776dcf6dd-xzs8g 0/1 CrashLoopBackOff 5 >> 7m35s >> >> Anyone can help me understand why? >> Why do kubernetes regard this pod as completed and restart? Should I >> config something? either Flink side or Kubernetes side? From the Flink >> manual, after the cluster is started, I can upload a jar to run the >> application. >> >> Thanks, >> Qihua >> > >
Re: Start Flink cluster, k8s pod behavior
Looks like after script *flink-daemon.sh *complete, it return exit 0. Kubernetes regard it as done. Is that expected? Thanks, Qihua On Thu, Sep 30, 2021 at 11:11 AM Qihua Yang wrote: > Thank you for your reply. > From the log, exit code is 0, and reason is Completed. > Looks like the cluster is fine. But why kubenetes restart the pod. As you > said, from perspective of Kubernetes everything is done. Then how to > prevent the restart? > It didn't even give chance to upload and run a jar > > Ports: 8081/TCP, 6123/TCP, 6124/TCP, 6125/TCP > Host Ports:0/TCP, 0/TCP, 0/TCP, 0/TCP > Command: > /opt/flink/bin/entrypoint.sh > Args: > /opt/flink/bin/run-job-manager.sh > State: Waiting > Reason: CrashLoopBackOff > Last State: Terminated > Reason: Completed > Exit Code:0 > Started: Wed, 29 Sep 2021 20:12:30 -0700 > Finished: Wed, 29 Sep 2021 20:12:45 -0700 > Ready: False > Restart Count: 131 > > Thanks, > Qihua > > On Thu, Sep 30, 2021 at 1:00 AM Chesnay Schepler > wrote: > >> Is the run-job-manager.sh script actually blocking? >> Since you (apparently) use that as an entrypoint, if that scripts exits >> after starting the JM then from the perspective of Kubernetes everything is >> done. >> >> On 30/09/2021 08:59, Matthias Pohl wrote: >> >> Hi Qihua, >> I guess, looking into kubectl describe and the JobManager logs would help >> in understanding what's going on. >> >> Best, >> Matthias >> >> On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang wrote: >> >>> Hi, >>> I deployed flink in session mode. I didn't run any jobs. I saw below >>> logs. That is normal, same as Flink menual shows. >>> >>> + /opt/flink/bin/run-job-manager.sh >>> Starting HA cluster with 1 masters. >>> Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g. >>> Starting taskexecutor daemon on host job-manager-776dcf6dd-xzs8g. >>> >>> >>> But when I check kubectl, it shows status is Completed. After a while, >>> status changed to CrashLoopBackOff, and pod restart. >>> NAME READY >>> STATUS RESTARTS AGE >>> job-manager-776dcf6dd-xzs8g 0/1 Completed 5 >>> 5m27s >>> >>> NAME READY >>> STATUS RESTARTS AGE >>> job-manager-776dcf6dd-xzs8g 0/1 CrashLoopBackOff 5 >>> 7m35s >>> >>> Anyone can help me understand why? >>> Why do kubernetes regard this pod as completed and restart? Should I >>> config something? either Flink side or Kubernetes side? From the Flink >>> manual, after the cluster is started, I can upload a jar to run the >>> application. >>> >>> Thanks, >>> Qihua >>> >> >>
jdbc connector configuration
Hi, I am trying to read data from database with JDBC driver. From [1], I have to config below parameters. I am not quite sure if I understand it correctly. lower-bound is smallest value of the first partition, upper-bound is largest value of the last partition. For example, if the db table has 1000 rows. lower-bound is 0, upper-bound is 999. Is that correct? If setting scan.partition.num to 10, each partition read 100 row? if I set scan.partition.num to 10 and I have 10 task managers. Each task manager will pick a partition to read? - scan.partition.column: The column name used for partitioning the input. - scan.partition.num: The number of partitions. - scan.partition.lower-bound: The smallest value of the first partition. - scan.partition.upper-bound: The largest value of the last partition. [1] https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/ Thanks, Qihua
Re: jdbc connector configuration
It is pretty clear. Thanks Caizhi! On Thu, Oct 7, 2021 at 7:27 PM Caizhi Weng wrote: > Hi! > > These configurations are not required to merely read from a database. They > are here to accelerate the reads by allowing sources to read data in > parallel. > > This optimization works by dividing the data into several > (scan.partition.num) partitions and each partition will be read by a task > slot (not a task manager, as a task manager may have multiple task slots). > You can set scan.partition.column to specify the partition key and also set > the lower and upper bounds for the range of data. > > Let's say your partition key is the column "k" which ranges from 0 to 999. > If you set the lower bound to 0, the upper bound to 999 and the number of > partitions to 10, then all data satisfying 0 <= k < 100 will be divided > into the first partition and read by the first task slot, all 100 <= k < > 200 will be divided into the second partition and read by the second task > slot and so on. So these configurations should have nothing to do with the > number of rows you have, but should be related to the range of your > partition key. > > Qihua Yang 于2021年10月7日周四 上午7:43写道: > >> Hi, >> >> I am trying to read data from database with JDBC driver. From [1], I have >> to config below parameters. I am not quite sure if I understand it >> correctly. lower-bound is smallest value of the first partition, >> upper-bound is largest value of the last partition. For example, if the db >> table has 1000 rows. lower-bound is 0, upper-bound is 999. Is that correct? >> If setting scan.partition.num to 10, each partition read 100 row? >> if I set scan.partition.num to 10 and I have 10 task managers. Each task >> manager will pick a partition to read? >> >>- scan.partition.column: The column name used for partitioning the >>input. >>- scan.partition.num: The number of partitions. >>- scan.partition.lower-bound: The smallest value of the first >>partition. >>- scan.partition.upper-bound: The largest value of the last partition. >> >> [1] >> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/ >> >> Thanks, >> Qihua >> >
Re: jdbc connector configuration
Hi, Sorry for asking again. I plan to use JDBC connector to scan a database. How do I know if it is done? Are there any metrics I can track? We want to monitor the progress, stop flink application when it is done. Thanks, Qihua On Fri, Oct 8, 2021 at 10:07 AM Qihua Yang wrote: > It is pretty clear. Thanks Caizhi! > > On Thu, Oct 7, 2021 at 7:27 PM Caizhi Weng wrote: > >> Hi! >> >> These configurations are not required to merely read from a database. >> They are here to accelerate the reads by allowing sources to read data in >> parallel. >> >> This optimization works by dividing the data into several >> (scan.partition.num) partitions and each partition will be read by a task >> slot (not a task manager, as a task manager may have multiple task slots). >> You can set scan.partition.column to specify the partition key and also set >> the lower and upper bounds for the range of data. >> >> Let's say your partition key is the column "k" which ranges from 0 to >> 999. If you set the lower bound to 0, the upper bound to 999 and the number >> of partitions to 10, then all data satisfying 0 <= k < 100 will be divided >> into the first partition and read by the first task slot, all 100 <= k < >> 200 will be divided into the second partition and read by the second task >> slot and so on. So these configurations should have nothing to do with the >> number of rows you have, but should be related to the range of your >> partition key. >> >> Qihua Yang 于2021年10月7日周四 上午7:43写道: >> >>> Hi, >>> >>> I am trying to read data from database with JDBC driver. From [1], I >>> have to config below parameters. I am not quite sure if I understand it >>> correctly. lower-bound is smallest value of the first partition, >>> upper-bound is largest value of the last partition. For example, if the db >>> table has 1000 rows. lower-bound is 0, upper-bound is 999. Is that correct? >>> If setting scan.partition.num to 10, each partition read 100 row? >>> if I set scan.partition.num to 10 and I have 10 task managers. Each task >>> manager will pick a partition to read? >>> >>>- scan.partition.column: The column name used for partitioning the >>>input. >>>- scan.partition.num: The number of partitions. >>>- scan.partition.lower-bound: The smallest value of the first >>>partition. >>>- scan.partition.upper-bound: The largest value of the last >>>partition. >>> >>> [1] >>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/ >>> >>> Thanks, >>> Qihua >>> >>
Re: jdbc connector configuration
Hi, If I configure batch mode, application will stop after the job is complete, right? Then k8s will restart the pod and rerun the job. That is not what we want. Thanks, Qihua On Tue, Oct 12, 2021 at 7:27 PM Caizhi Weng wrote: > Hi! > > It seems that you want to run a batch job instead of a streaming job. > Call EnvironmentSettings.newInstance().inBatchMode().build() to create your > environment settings for a batch job. > > Qihua Yang 于2021年10月13日周三 上午5:50写道: > >> Hi, >> >> Sorry for asking again. I plan to use JDBC connector to scan a database. >> How do I know if it is done? Are there any metrics I can track? We want to >> monitor the progress, stop flink application when it is done. >> >> Thanks, >> Qihua >> >> On Fri, Oct 8, 2021 at 10:07 AM Qihua Yang wrote: >> >>> It is pretty clear. Thanks Caizhi! >>> >>> On Thu, Oct 7, 2021 at 7:27 PM Caizhi Weng wrote: >>> >>>> Hi! >>>> >>>> These configurations are not required to merely read from a database. >>>> They are here to accelerate the reads by allowing sources to read data in >>>> parallel. >>>> >>>> This optimization works by dividing the data into several >>>> (scan.partition.num) partitions and each partition will be read by a task >>>> slot (not a task manager, as a task manager may have multiple task slots). >>>> You can set scan.partition.column to specify the partition key and also set >>>> the lower and upper bounds for the range of data. >>>> >>>> Let's say your partition key is the column "k" which ranges from 0 to >>>> 999. If you set the lower bound to 0, the upper bound to 999 and the number >>>> of partitions to 10, then all data satisfying 0 <= k < 100 will be divided >>>> into the first partition and read by the first task slot, all 100 <= k < >>>> 200 will be divided into the second partition and read by the second task >>>> slot and so on. So these configurations should have nothing to do with the >>>> number of rows you have, but should be related to the range of your >>>> partition key. >>>> >>>> Qihua Yang 于2021年10月7日周四 上午7:43写道: >>>> >>>>> Hi, >>>>> >>>>> I am trying to read data from database with JDBC driver. From [1], I >>>>> have to config below parameters. I am not quite sure if I understand it >>>>> correctly. lower-bound is smallest value of the first partition, >>>>> upper-bound is largest value of the last partition. For example, if the db >>>>> table has 1000 rows. lower-bound is 0, upper-bound is 999. Is that >>>>> correct? >>>>> If setting scan.partition.num to 10, each partition read 100 row? >>>>> if I set scan.partition.num to 10 and I have 10 task managers. Each >>>>> task manager will pick a partition to read? >>>>> >>>>>- scan.partition.column: The column name used for partitioning the >>>>>input. >>>>>- scan.partition.num: The number of partitions. >>>>>- scan.partition.lower-bound: The smallest value of the first >>>>>partition. >>>>>- scan.partition.upper-bound: The largest value of the last >>>>>partition. >>>>> >>>>> [1] >>>>> https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/connectors/table/jdbc/ >>>>> >>>>> Thanks, >>>>> Qihua >>>>> >>>>
Flink JDBC connect with secret
Hi, We plan to use JDBC SQL connector to read/write database. I saw JDBC connector use username and password. Is it possible to use secret(*.crt) to access database. I didn't find guideline how to use it. How to config jdbc with secret? val jdbc: JdbcConnectionOptions = JdbcConnectionOptions.JdbcConnectionOptionsBuilder() .withUrl(url) .withDriverName("org.postgresql.Driver") .withUsername(userName) .withPassword(password) .build() Thanks, Qihua
Re: Flink JDBC connect with secret
Hi Jing, Thank you for your suggestion. I will check if SSL parameters in URL works. Thanks, Qihua On Sat, Oct 23, 2021 at 8:37 PM JING ZHANG wrote: > Hi Qihua, > I checked user documents of several database vendors(postgres, oracle, > solidDB,SQL server)[1][2][3][4][5], and studied how to use JDBC Driver with > SSL to connect to these databases. > Most of database vendors supports two ways: > 1. Option1: Use Connection url > 2. Option2: Define in Properties when call `DriverManager.getConnection` > > Url is exposed to users in JDBC SQL connector currently, while properties > parameters are not exposed yet. > Would you please check whether defining SSL parameters in url could work > first? If not, we would looking for other solution. > > [1] https://jdbc.postgresql.org/documentation/head/connect.html > [2] > https://www.oracle.com/technetwork/topics/wp-oracle-jdbc-thin-ssl-130128.pdf > [3] > https://support.unicomsi.com/manuals/soliddb/100/index.html#page/Administrator_Guide/6_Managing_network.07.13.html > [4] > https://docs.microsoft.com/en-us/sql/connect/jdbc/connecting-with-ssl-encryption?view=sql-server-ver15 > [5] > https://www.ibm.com/docs/ar/informix-servers/14.10?topic=options-connecting-jdbc-applications-ssl > > Best, > JING ZHANG > > > Qihua Yang 于2021年10月23日周六 下午1:11写道: > >> Hi, >> >> We plan to use JDBC SQL connector to read/write database. I saw JDBC >> connector use username and password. Is it possible to use secret(*.crt) to >> access database. I didn't find guideline how to use it. How to config jdbc >> with secret? >> >> val jdbc: JdbcConnectionOptions = >> JdbcConnectionOptions.JdbcConnectionOptionsBuilder() >> .withUrl(url) >> .withDriverName("org.postgresql.Driver") >> .withUsername(userName) >> .withPassword(password) >> .build() >> >> Thanks, >> Qihua >> >
Flink handle both kafka source and db source
Hi, My flink app has two data sources. One is from a Kafka topic, one is from a database by using the JDBC connector. Flink scan the full database table. Which mode should we use? batch mode or streaming mode? How do we know the database table is fully scanned? Will Flink throw any signal to show it is done?
Re: Flink handle both kafka source and db source
Thank you for your reply! I will check Hybrid source. How do we know the database table is fully scanned? And after the scan is completed, does flink scan the table again or keep idling? Thanks, Qihua On Tue, Oct 26, 2021 at 1:59 PM Rafi Aroch wrote: > Hi, > > Take a look at the new 1.14 feature called Hybrid Source: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/ > > Rafi > > > On Tue, Oct 26, 2021 at 7:46 PM Qihua Yang wrote: > >> Hi, >> >> My flink app has two data sources. One is from a Kafka topic, one is from >> a database by using the JDBC connector. Flink scan the full database table. >> Which mode should we use? batch mode or streaming mode? >> How do we know the database table is fully scanned? Will Flink throw any >> signal to show it is done? >> >>
database as stream source issue
Hi, I am trying to use postgres DB as the stream data source and push to kafka topic. Here is how I config database source. Looks like it didn't read out any data. But I didn't see any error from the flink log. I did a test, tried to insert wrong data to database, I saw flink throw below error. Looks like flink try to read data from database. *java.lang.ClassCastException: class java.lang.Long cannot be cast to class java.lang.Integer (java.lang.Long and java.lang.Integer are in module java.base of loader 'bootstrap')* I saw job manager shows switched from DEPLOYING to RUNNING. and switched from RUNNING to FINISHED immediately. Can anyone help understand why? Did I config anything wrong? Or I missed anything? *2021-10-28 02:49:52.777 [flink-akka.actor.default-dispatcher-3] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Sink: test-sink (2/2) (7aa24e97a11bbd831941d636910fe84f) switched from DEPLOYING to RUNNING.2021-10-28 02:49:53.245 [flink-akka.actor.default-dispatcher-15] INFO org.apache.flink.runtime.executiongraph.ExecutionGraph - Source: JdbcTableSource(id, name, address) -> Map -> Map -> Filter (1/2) (558afdc9dd911684833d0e51943eef92) switched from RUNNING to FINISHED.* val options = JdbcOptions.builder() // .setDBUrl("jdbc:derby:memory:mydb") .setDBUrl("") .setTableName("test_store") .setDriverName("org.postgresql.Driver") .setUsername("dbUser") .setPassword("123") .build() val readOptions = JdbcReadOptions.builder() .setPartitionColumnName("id") .setPartitionLowerBound(-1) .setPartitionUpperBound(DB_SIZE) .setNumPartitions(PARTITIONS) //.setFetchSize(0) .build() val lookupOptions = JdbcLookupOptions.builder() .setCacheMaxSize(-1) .setCacheExpireMs(CACHE_SIZE) .setMaxRetryTimes(2) .build() val dataSource = JdbcTableSource.builder() .setOptions(options) .setReadOptions(readOptions) .setLookupOptions(lookupOptions) .setSchema(storeSchema) .build().getDataStream(env)
Flink sink data to DB and then commit data to Kafka
Hi, Our flink application has two sinks(DB and kafka topic). We want to push same data to both sinks. Is it possible to push data to kafka topic only after data is pushed to DB successfully? If the commit to DB fail, we don't want those data is pushed to kafka. Thanks, Qihua
Re: Flink sink data to DB and then commit data to Kafka
Many thanks guys! Hi Ali, for approach 2, what is the better way to do the database inserts for this case? Currently we simply use JDBC SQL connector to sink to database. Thanks, Qihua On Wed, Nov 3, 2021 at 8:13 AM Ali Bahadir Zeybek wrote: > Hello Qihua, > > If you do not care with the events that are not committed to DB, > you can use Async I/O [1] and implement a logic that > >- does the database inserts >- completes the original events that are only accepted by DB > > You can then sink this new datastream to kafka. > > If you are also interested in the events that are not committed to DB, > you can use a Process Function [2] and implement a logic that > >- does the database inserts >- collects the original events that are only accepted by DB >- sends the ones that are not accepted by DB to a side output > > You can then sink this new datastream to kafka and maybe sideoutput to > another topic. > > Sincerely, > > Ali Bahadir Zeybek > > [1]: > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio > [2]: > https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function > > On Wed, Nov 3, 2021 at 3:16 PM Francesco Guardiani < > france...@ververica.com> wrote: > >> An alternative is to use a CDC tool like Debezium to stream your table >> changes, and then ingest that stream using Flink to push data later to >> Kafka. >> >> On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma wrote: >> >>> Hi, Qihua >>> >>> AFAIK there is no way to do it. Maybe you need to implement a "new" sink >>> to archive this target. >>> >>> Best, >>> Guowei >>> >>> >>> On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang wrote: >>> >>>> Hi, >>>> >>>> Our flink application has two sinks(DB and kafka topic). We want to >>>> push same data to both sinks. Is it possible to push data to kafka topic >>>> only after data is pushed to DB successfully? If the commit to DB fail, we >>>> don't want those data is pushed to kafka. >>>> >>>> Thanks, >>>> Qihua >>>> >>>
Re: Flink sink data to DB and then commit data to Kafka
Hi Ali, Thank you so much! That is very helpful. Thanks, Qihua On Wed, Nov 3, 2021 at 2:46 PM Ali Bahadir Zeybek wrote: > Hello Qihua, > > This will require you to implement and maintain your own database insertion > logic using any of the clients that your database and programming language > supports. Bear in mind that you will be losing all the optimizations > Flink's connector > provides for you and this will add complexity to the amount of the code > you will have to maintain. On the other hand it will handle the case > within one job. > > If you have more control on the things you can do with your database, and > the > latency to kafka is not a major issue since there will be more moving > parts, then > what @Francesco Guardiani suggested is also a > good approach. You will need > to maintain more systems, i.e. Debezium, but less custom code. > > Therefore, it is mostly up to your requirements and available resources > you have > on how to proceed. > > Sincerely, > > Ali Bahadir Zeybek > > > > > > On Wed, Nov 3, 2021 at 10:13 PM Qihua Yang wrote: > >> Many thanks guys! >> Hi Ali, for approach 2, what is the better way to do the database inserts >> for this case? Currently we simply use JDBC SQL connector to sink to >> database. >> >> Thanks, >> Qihua >> >> On Wed, Nov 3, 2021 at 8:13 AM Ali Bahadir Zeybek >> wrote: >> >>> Hello Qihua, >>> >>> If you do not care with the events that are not committed to DB, >>> you can use Async I/O [1] and implement a logic that >>> >>>- does the database inserts >>>- completes the original events that are only accepted by DB >>> >>> You can then sink this new datastream to kafka. >>> >>> If you are also interested in the events that are not committed to DB, >>> you can use a Process Function [2] and implement a logic that >>> >>>- does the database inserts >>>- collects the original events that are only accepted by DB >>>- sends the ones that are not accepted by DB to a side output >>> >>> You can then sink this new datastream to kafka and maybe sideoutput to >>> another topic. >>> >>> Sincerely, >>> >>> Ali Bahadir Zeybek >>> >>> [1]: >>> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/asyncio >>> [2]: >>> https://nightlies.apache.org/flink/flink-docs-stable/docs/dev/datastream/operators/process_function >>> >>> On Wed, Nov 3, 2021 at 3:16 PM Francesco Guardiani < >>> france...@ververica.com> wrote: >>> >>>> An alternative is to use a CDC tool like Debezium to stream your table >>>> changes, and then ingest that stream using Flink to push data later to >>>> Kafka. >>>> >>>> On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma wrote: >>>> >>>>> Hi, Qihua >>>>> >>>>> AFAIK there is no way to do it. Maybe you need to implement a "new" >>>>> sink to archive this target. >>>>> >>>>> Best, >>>>> Guowei >>>>> >>>>> >>>>> On Wed, Nov 3, 2021 at 12:40 PM Qihua Yang wrote: >>>>> >>>>>> Hi, >>>>>> >>>>>> Our flink application has two sinks(DB and kafka topic). We want to >>>>>> push same data to both sinks. Is it possible to push data to kafka topic >>>>>> only after data is pushed to DB successfully? If the commit to DB fail, >>>>>> we >>>>>> don't want those data is pushed to kafka. >>>>>> >>>>>> Thanks, >>>>>> Qihua >>>>>> >>>>>
stream consume from kafka after DB scan is done
Hi, Our stream has two sources. one is a Kafka topic, one is a database. Is it possible to consume from kafka topic only after DB scan is completed? We configured it in batch mode. Thanks, Qihua
Re: stream consume from kafka after DB scan is done
Hi Austin, That is exactly what I want. Is it possible to use JdbcTableSource as the first source? Looks like only FileSource can be used as the first source? Below is the error. val jdbcSource = JdbcTableSource.builder() .setOptions(options) .setReadOptions(readOptions) .setLookupOptions(lookupOptions) .setSchema(schema) .build() val hybridSource = HybridSource.builder(jdbcSource) .addSource(kafkaSource) .build(); [image: Screen Shot 2021-11-05 at 10.41.59 PM.png] [image: Screen Shot 2021-11-05 at 10.42.13 PM.png] On Fri, Nov 5, 2021 at 5:11 PM Austin Cawley-Edwards < austin.caw...@gmail.com> wrote: > Hey Qihua, > > If I understand correctly, you should be able to with the HybridSource, > released in 1.14 [1] > > Best, > Austin > > [1]: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/ > > On Fri, Nov 5, 2021 at 3:53 PM Qihua Yang wrote: > >> Hi, >> >> Our stream has two sources. one is a Kafka topic, one is a database. Is >> it possible to consume from kafka topic only after DB scan is completed? We >> configured it in batch mode. >> >> Thanks, >> Qihua >> >
Re: stream consume from kafka after DB scan is done
If HybridSource cannot support JdbcSource, is there any approach I can try? sequentially read input from two sources. After read data from database, start to read data from kafka topic? Thanks, Qihua On Fri, Nov 5, 2021 at 10:44 PM Qihua Yang wrote: > Hi Austin, > > That is exactly what I want. Is it possible to use JdbcTableSource as the > first source? Looks like only FileSource can be used as the first source? > Below is the error. > > val jdbcSource = JdbcTableSource.builder() > .setOptions(options) > .setReadOptions(readOptions) > .setLookupOptions(lookupOptions) > .setSchema(schema) > .build() > > val hybridSource = HybridSource.builder(jdbcSource) > .addSource(kafkaSource) > .build(); > > > > [image: Screen Shot 2021-11-05 at 10.41.59 PM.png] > [image: Screen Shot 2021-11-05 at 10.42.13 PM.png] > > On Fri, Nov 5, 2021 at 5:11 PM Austin Cawley-Edwards < > austin.caw...@gmail.com> wrote: > >> Hey Qihua, >> >> If I understand correctly, you should be able to with the HybridSource, >> released in 1.14 [1] >> >> Best, >> Austin >> >> [1]: >> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/ >> >> On Fri, Nov 5, 2021 at 3:53 PM Qihua Yang wrote: >> >>> Hi, >>> >>> Our stream has two sources. one is a Kafka topic, one is a database. Is >>> it possible to consume from kafka topic only after DB scan is completed? We >>> configured it in batch mode. >>> >>> Thanks, >>> Qihua >>> >>
submit a job issue
Hi, We are running a flink cluster in session mode. After the cluster is launched, no job is running. We want to use REST api to run a job at some point. The jobManager already contains a jar when we deploy. We don't want to upload a new jar by using REST api. Is there any way that I use REST api to run this jar? Looks like REST api only can submit a job by running a jar that was previously uploaded by using REST api. Thanks, Qihua
Re: submit a job issue
Sorry, maybe I didn't explain it clearly. I want to use "/jars/:jarId/run" to submit a job by using the jar that is not uploaded from "/jars/upload" Only jars that uploaded from rest api can be seen from "/jars/" and can submit the job by using "/jars/:jarId/run" For example, the instance that run the job manager has a jar (test.jar). It was not uploaded by using "/jars/:jarId/run". rest api "/jars/" cannot show that jar. Is that possible to use "/jars/:jarId/run" to submit a job by using that jar? Thanks, Qihua On Mon, Nov 22, 2021 at 6:09 PM Caizhi Weng wrote: > Hi! > > You can try /jars/:jarId/run, see [1] > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run > > Qihua Yang 于2021年11月23日周二 上午9:27写道: > >> Hi, >> >> We are running a flink cluster in session mode. After the cluster is >> launched, no job is running. We want to use REST api to run a job at some >> point. >> The jobManager already contains a jar when we deploy. We don't want to >> upload a new jar by using REST api. Is there any way that I use REST api to >> run this jar? >> Looks like REST api only can submit a job by running a jar that was >> previously uploaded by using REST api. >> >> Thanks, >> Qihua >> >
Flink rest api to start a job
Hi, I found a weird behavior. We launched a k8s cluster without job. But includes the jar A. I use Flink rest api to upload a dummy jar(actually it can be any jar). Flink will create a jar id. Then I use rest api to start the job with the jar A entry-class. But the jar id is the dummy jar id. Flink will start the job from jar A. Anyone know why? My understanding is flink rest api should start the job from the dummy jar, because jar id is dummy jar id that I uploaded. Here are steps what I did: 1. deploy a k8s pod contains working jar(testA.jar) 1. flink rest api upload jar, testB.jar, flink generate jar id, 2d6a9263-c9d3-4f23-9f59-fc3594aadf0c_job.jar 2. flink rest api to runJar with testB.jar id, but testA.jar entry-class. 3. flink start job from testA.jar Thanks, Qihua
Re: Flink rest api to start a job
Hi Yun, Thank you for your reply! testB.jar doesn't have the same entry class as testA.jar. So is it expected behavior? What is the theory behind? Thanks, Qihua On Fri, Jan 7, 2022 at 4:27 PM Yun Gao wrote: > > Hi Qihua > > Sorry may I double confirm that whether the entry class exists in both > testA and testB? > > IF testA.jar is included on startup, it would be loaded in the parent > classloader, which > is the parent classloader for the user classloader that loads testB. Thus > at least if the > entry-class is exist only in testA, it should still be found. > > Best, > Yun > > -- > Sender:Qihua Yang > Date:2022/01/07 02:55:09 > Recipient:user > Theme:Flink rest api to start a job > > Hi, > > I found a weird behavior. We launched a k8s cluster without job. But > includes the jar A. I use Flink rest api to upload a dummy jar(actually it > can be any jar). Flink will create a jar id. Then I use rest api to start > the job with the jar A entry-class. But the jar id is the dummy jar id. > Flink will start the job from jar A. Anyone know why? > My understanding is flink rest api should start the job from the dummy > jar, because jar id is dummy jar id that I uploaded. > Here are steps what I did: > 1. deploy a k8s pod contains working jar(testA.jar) > 1. flink rest api upload jar, testB.jar, flink generate jar id, > 2d6a9263-c9d3-4f23-9f59-fc3594aadf0c_job.jar > 2. flink rest api to runJar with testB.jar id, but testA.jar entry-class. > 3. flink start job from testA.jar > > Thanks, > Qihua > >
JDBC table reading
Hi, I plan to use the database as flink source by using flink JDBC. I know setNumPartitions can be used for parallelism in table reading. If the num of task managers is less than numPartitions, what is the behavior? For example: I config setNumPartitions(20), but flink application only has 10 task managers. How flink assign tasks? Or that is totally wrong configuration? num of tasks has to be larger than numPartitions? Thanks, Qihua
JDBC read DB causeOutOfMemoryError: Java heap space
Hi, I have a flink cluster(50 hosts, each host runs a task manager). I am using Flink JDBC to consume data from a database. The db table is pretty large, around 18734 rows. I configured the JDBC number of partitions to 50. fetchSize is 20. Flink application has 50 task managers. Anyone know why I got OutOfMemoryError? How should I config it? Thanks, Qihua
Re: JDBC read DB causeOutOfMemoryError: Java heap space
Here is the errors Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "server-timer" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "I/O dispatcher 16" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "HTTP-Dispatcher" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "I/O dispatcher 11" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "I/O dispatcher 9" On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang wrote: > Hi, > > I have a flink cluster(50 hosts, each host runs a task manager). > I am using Flink JDBC to consume data from a database. The db table is > pretty large, around 18734 rows. I configured the JDBC number of > partitions to 50. fetchSize is 20. Flink application has 50 task managers. > Anyone know why I got OutOfMemoryError? How should I config it? > > Thanks, > Qihua > >
Re: JDBC read DB causeOutOfMemoryError: Java heap space
Hi Caizhi, Thank you for your reply. The heap size is 512m. Fetching from the DB table is the only costly operation. After fetching from DB, I simply ingested a kafka topic. That should not be the bottleneck. Here is the jdbc configuration. Is that correct config? val query = String.format("SELECT * FROM %s", tableName) val options = JdbcOptions.builder() .setDBUrl(url) .setTableName(tableName) .setDriverName(DRIVER_NAME) .setUsername(userName) .setPassword(password) .build() val readOptions = JdbcReadOptions.builder() .setQuery(query) .setPartitionColumnName(PARTITION_KEY) .setPartitionLowerBound(dbLowerBound) .setPartitionUpperBound(dbUpperBound) .setNumPartitions(50) .setFetchSize(20) .build() val lookupOptions = JdbcLookupOptions.builder() .setCacheMaxSize(-1) .setCacheExpireMs(1000) .setMaxRetryTimes(2) .build() val rawSource = JdbcTableSource.builder() .setOptions(options) .setReadOptions(readOptions) .setLookupOptions(lookupOptions) .setSchema(schema) .build().getDataStream(env) On Tue, Jan 18, 2022 at 11:48 PM Caizhi Weng wrote: > Hi! > > This is not the desired behavior. As you have set fetchSize to 20 there > will be only 20 records in each parallelism of the source. How large is > your heap size? Does your job have any other operations which consume a lot > of heap memory? > > Qihua Yang 于2022年1月19日周三 15:27写道: > >> Here is the errors >> Exception: java.lang.OutOfMemoryError thrown from the >> UncaughtExceptionHandler in thread "server-timer" >> Exception: java.lang.OutOfMemoryError thrown from the >> UncaughtExceptionHandler in thread "I/O dispatcher 16" >> Exception: java.lang.OutOfMemoryError thrown from the >> UncaughtExceptionHandler in thread "HTTP-Dispatcher" >> Exception: java.lang.OutOfMemoryError thrown from the >> UncaughtExceptionHandler in thread "I/O dispatcher 11" >> Exception: java.lang.OutOfMemoryError thrown from the >> UncaughtExceptionHandler in thread "I/O dispatcher 9" >> >> On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang wrote: >> >>> Hi, >>> >>> I have a flink cluster(50 hosts, each host runs a task manager). >>> I am using Flink JDBC to consume data from a database. The db table is >>> pretty large, around 18734 rows. I configured the JDBC number of >>> partitions to 50. fetchSize is 20. Flink application has 50 task managers. >>> Anyone know why I got OutOfMemoryError? How should I config it? >>> >>> Thanks, >>> Qihua >>> >>>
Re: JDBC table reading
Hi Caizhi, Got it! Thanks for your clarification. On Tue, Jan 18, 2022 at 11:45 PM Caizhi Weng wrote: > Hi! > > "But flink application only has 10 task managers". I assume that you're > talking about task slots instead of task managers. > > If there are more partitions than task slots, each task slot may be > assigned more than one partition. By default this is a first come first > served assignment. After a task slot finishes its current partition, it > will ask from the job manager for the next partition unless all partitions > have been distributed. > > Qihua Yang 于2022年1月19日周三 14:51写道: > >> Hi, >> >> I plan to use the database as flink source by using flink JDBC. I know >> setNumPartitions can be used for parallelism in table reading. >> If the num of task managers is less than numPartitions, what is the >> behavior? >> For example: >> I config setNumPartitions(20), but flink application only has 10 task >> managers. How flink assign tasks? Or that is totally wrong configuration? >> num of tasks has to be larger than numPartitions? >> >> Thanks, >> Qihua >> >
Re: JDBC read DB causeOutOfMemoryError: Java heap space
Should I change the query? something like below to add a limit? If no limit, does that mean flink will read whole huge table to memory and fetch and return 20 records each time? val query = String.format("SELECT * FROM %s limit 1000", tableName) On Tue, Jan 18, 2022 at 11:56 PM Qihua Yang wrote: > Hi Caizhi, > > Thank you for your reply. The heap size is 512m. Fetching from the DB > table is the only costly operation. After fetching from DB, I simply > ingested a kafka topic. That should not be the bottleneck. > Here is the jdbc configuration. Is that correct config? > > val query = String.format("SELECT * FROM %s", tableName) > > val options = JdbcOptions.builder() > .setDBUrl(url) > .setTableName(tableName) > .setDriverName(DRIVER_NAME) > .setUsername(userName) > .setPassword(password) > .build() > val readOptions = JdbcReadOptions.builder() > .setQuery(query) > .setPartitionColumnName(PARTITION_KEY) > .setPartitionLowerBound(dbLowerBound) > .setPartitionUpperBound(dbUpperBound) > .setNumPartitions(50) > .setFetchSize(20) > .build() > val lookupOptions = JdbcLookupOptions.builder() > .setCacheMaxSize(-1) > .setCacheExpireMs(1000) > .setMaxRetryTimes(2) > .build() > val rawSource = JdbcTableSource.builder() > .setOptions(options) > .setReadOptions(readOptions) > .setLookupOptions(lookupOptions) > .setSchema(schema) > .build().getDataStream(env) > > > On Tue, Jan 18, 2022 at 11:48 PM Caizhi Weng wrote: > >> Hi! >> >> This is not the desired behavior. As you have set fetchSize to 20 there >> will be only 20 records in each parallelism of the source. How large is >> your heap size? Does your job have any other operations which consume a lot >> of heap memory? >> >> Qihua Yang 于2022年1月19日周三 15:27写道: >> >>> Here is the errors >>> Exception: java.lang.OutOfMemoryError thrown from the >>> UncaughtExceptionHandler in thread "server-timer" >>> Exception: java.lang.OutOfMemoryError thrown from the >>> UncaughtExceptionHandler in thread "I/O dispatcher 16" >>> Exception: java.lang.OutOfMemoryError thrown from the >>> UncaughtExceptionHandler in thread "HTTP-Dispatcher" >>> Exception: java.lang.OutOfMemoryError thrown from the >>> UncaughtExceptionHandler in thread "I/O dispatcher 11" >>> Exception: java.lang.OutOfMemoryError thrown from the >>> UncaughtExceptionHandler in thread "I/O dispatcher 9" >>> >>> On Tue, Jan 18, 2022 at 11:25 PM Qihua Yang wrote: >>> >>>> Hi, >>>> >>>> I have a flink cluster(50 hosts, each host runs a task manager). >>>> I am using Flink JDBC to consume data from a database. The db table is >>>> pretty large, around 18734 rows. I configured the JDBC number of >>>> partitions to 50. fetchSize is 20. Flink application has 50 task managers. >>>> Anyone know why I got OutOfMemoryError? How should I config it? >>>> >>>> Thanks, >>>> Qihua >>>> >>>>
Flink JDBC connector behavior
Hi, We are using flink jdbc connector to read whole database table line by line. A few things I don't quite understand. We configured BATCH_SIZE=100 and PARTITION_NUM=1000, table is pretty big. What is the flink internal behavior to read data from table? Flink read BATCH_SIZE data each time? Or it read (tableSize/PARTITION_NUM) data each time? Or it read whole table into memory each time? database metrics show the sql latency is extremely high, almost 20s. is there any way to optimize it? val query = String.format("SELECT * FROM %s", tableName) val options = JdbcOptions.builder() .setDBUrl(url) .setTableName(tableName) .setDriverName(DRIVER_NAME) .setUsername(userName) .setPassword(password) .build() val readOptions = JdbcReadOptions.builder() .setQuery(query) .setPartitionColumnName(PARTITION_KEY) .setPartitionLowerBound(esSinkConf.dbLowerBound) .setPartitionUpperBound(esSinkConf.dbUpperBound) .setNumPartitions(PARTITION_NUM) .setFetchSize(BATCH_SIZE) .build() val lookupOptions = JdbcLookupOptions.builder() .setCacheMaxSize(-1) .setCacheExpireMs(CACHE_SIZE) .setMaxRetryTimes(2) .build() val rawSource = JdbcTableSource.builder() .setOptions(options) .setReadOptions(readOptions) .setLookupOptions(lookupOptions) .setSchema(schema) .build().getDataStream(env)