Flink JDBC connector behavior

2022-02-08 Thread Qihua Yang
Hi, We are using flink jdbc connector to read whole database table line by line. A few things I don't quite understand. We configured BATCH_SIZE=100 and PARTITION_NUM=1000, table is pretty big. What is the flink internal behavior to read data from table? Flink read BATCH_SIZE data each time? Or

Re: JDBC read DB causeOutOfMemoryError: Java heap space

2022-01-19 Thread Qihua Yang
Should I change the query? something like below to add a limit? If no limit, does that mean flink will read whole huge table to memory and fetch and return 20 records each time? val query = String.format("SELECT * FROM %s limit 1000", tableName) On Tue, Jan 18, 2022 at 11:56 PM

Re: JDBC table reading

2022-01-18 Thread Qihua Yang
itions than task slots, each task slot may be > assigned more than one partition. By default this is a first come first > served assignment. After a task slot finishes its current partition, it > will ask from the job manager for the next partition unless all partitions > have been distribu

Re: JDBC read DB causeOutOfMemoryError: Java heap space

2022-01-18 Thread Qihua Yang
ed behavior. As you have set fetchSize to 20 there > will be only 20 records in each parallelism of the source. How large is > your heap size? Does your job have any other operations which consume a lot > of heap memory? > > Qihua Yang 于2022年1月19日周三 15:27写道: &g

Re: JDBC read DB causeOutOfMemoryError: Java heap space

2022-01-18 Thread Qihua Yang
oryError thrown from the UncaughtExceptionHandler in thread "HTTP-Dispatcher" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "I/O dispatcher 11" Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "I/O dispatch

JDBC read DB causeOutOfMemoryError: Java heap space

2022-01-18 Thread Qihua Yang
Hi, I have a flink cluster(50 hosts, each host runs a task manager). I am using Flink JDBC to consume data from a database. The db table is pretty large, around 18734 rows. I configured the JDBC number of partitions to 50. fetchSize is 20. Flink application has 50 task managers. Anyone know

JDBC table reading

2022-01-18 Thread Qihua Yang
Hi, I plan to use the database as flink source by using flink JDBC. I know setNumPartitions can be used for parallelism in table reading. If the num of task managers is less than numPartitions, what is the behavior? For example: I config setNumPartitions(20), but flink application only has 10

Re: Flink rest api to start a job

2022-01-08 Thread Qihua Yang
Hi Yun, Thank you for your reply! testB.jar doesn't have the same entry class as testA.jar. So is it expected behavior? What is the theory behind? Thanks, Qihua On Fri, Jan 7, 2022 at 4:27 PM Yun Gao wrote: > > Hi Qihua > > Sorry may I double confirm that whether the entry class exists in

Flink rest api to start a job

2022-01-06 Thread Qihua Yang
Hi, I found a weird behavior. We launched a k8s cluster without job. But includes the jar A. I use Flink rest api to upload a dummy jar(actually it can be any jar). Flink will create a jar id. Then I use rest api to start the job with the jar A entry-class. But the jar id is the dummy jar id.

Re: submit a job issue

2021-11-22 Thread Qihua Yang
on, Nov 22, 2021 at 6:09 PM Caizhi Weng wrote: > Hi! > > You can try /jars/:jarId/run, see [1] > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run > > Qihua Yang 于2021年11月23日周二 上午9:27写道: > >> Hi, >> >> We are ru

submit a job issue

2021-11-22 Thread Qihua Yang
Hi, We are running a flink cluster in session mode. After the cluster is launched, no job is running. We want to use REST api to run a job at some point. The jobManager already contains a jar when we deploy. We don't want to upload a new jar by using REST api. Is there any way that I use REST api

Re: stream consume from kafka after DB scan is done

2021-11-05 Thread Qihua Yang
If HybridSource cannot support JdbcSource, is there any approach I can try? sequentially read input from two sources. After read data from database, start to read data from kafka topic? Thanks, Qihua On Fri, Nov 5, 2021 at 10:44 PM Qihua Yang wrote: > Hi Austin, > > That is exactly wh

Re: stream consume from kafka after DB scan is done

2021-11-05 Thread Qihua Yang
/datastream/hybridsource/ > > On Fri, Nov 5, 2021 at 3:53 PM Qihua Yang wrote: > >> Hi, >> >> Our stream has two sources. one is a Kafka topic, one is a database. Is >> it possible to consume from kafka topic only after DB scan is completed? We >> configured it in batch mode. >> >> Thanks, >> Qihua >> >

stream consume from kafka after DB scan is done

2021-11-05 Thread Qihua Yang
Hi, Our stream has two sources. one is a Kafka topic, one is a database. Is it possible to consume from kafka topic only after DB scan is completed? We configured it in batch mode. Thanks, Qihua

Re: Flink sink data to DB and then commit data to Kafka

2021-11-05 Thread Qihua Yang
ut less custom code. > > Therefore, it is mostly up to your requirements and available resources > you have > on how to proceed. > > Sincerely, > > Ali Bahadir Zeybek > > > > > > On Wed, Nov 3, 2021 at 10:13 PM Qihua Yang wrote: > >> Many than

Re: Flink sink data to DB and then commit data to Kafka

2021-11-03 Thread Qihua Yang
gest that stream using Flink to push data later to >> Kafka. >> >> On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma wrote: >> >>> Hi, Qihua >>> >>> AFAIK there is no way to do it. Maybe you need to implement a "new" sink >>> to archive this

Flink sink data to DB and then commit data to Kafka

2021-11-02 Thread Qihua Yang
Hi, Our flink application has two sinks(DB and kafka topic). We want to push same data to both sinks. Is it possible to push data to kafka topic only after data is pushed to DB successfully? If the commit to DB fail, we don't want those data is pushed to kafka. Thanks, Qihua

database as stream source issue

2021-10-27 Thread Qihua Yang
Hi, I am trying to use postgres DB as the stream data source and push to kafka topic. Here is how I config database source. Looks like it didn't read out any data. But I didn't see any error from the flink log. I did a test, tried to insert wrong data to database, I saw flink throw below error.

Re: Flink handle both kafka source and db source

2021-10-27 Thread Qihua Yang
new 1.14 feature called Hybrid Source: > https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/ > > Rafi > > > On Tue, Oct 26, 2021 at 7:46 PM Qihua Yang wrote: > >> Hi, >> >> My flink app has two data sources. One

Re: Flink JDBC connect with secret

2021-10-26 Thread Qihua Yang
5 > [5] > https://www.ibm.com/docs/ar/informix-servers/14.10?topic=options-connecting-jdbc-applications-ssl > > Best, > JING ZHANG > > > Qihua Yang 于2021年10月23日周六 下午1:11写道: > >> Hi, >> >> We plan to use JDBC SQL connector to read/write database. I saw

Flink JDBC connect with secret

2021-10-22 Thread Qihua Yang
Hi, We plan to use JDBC SQL connector to read/write database. I saw JDBC connector use username and password. Is it possible to use secret(*.crt) to access database. I didn't find guideline how to use it. How to config jdbc with secret? val jdbc: JdbcConnectionOptions =

Re: jdbc connector configuration

2021-10-12 Thread Qihua Yang
b instead of a streaming job. > Call EnvironmentSettings.newInstance().inBatchMode().build() to create your > environment settings for a batch job. > > Qihua Yang 于2021年10月13日周三 上午5:50写道: > >> Hi, >> >> Sorry for asking again. I plan to use JDBC connector to scan

Re: jdbc connector configuration

2021-10-12 Thread Qihua Yang
Hi, Sorry for asking again. I plan to use JDBC connector to scan a database. How do I know if it is done? Are there any metrics I can track? We want to monitor the progress, stop flink application when it is done. Thanks, Qihua On Fri, Oct 8, 2021 at 10:07 AM Qihua Yang wrote: > It is pre

Re: jdbc connector configuration

2021-10-08 Thread Qihua Yang
nto the first partition and read by the first task slot, all 100 <= k < > 200 will be divided into the second partition and read by the second task > slot and so on. So these configurations should have nothing to do with the > number of rows you have, but should be related to the

jdbc connector configuration

2021-10-06 Thread Qihua Yang
Hi, I am trying to read data from database with JDBC driver. From [1], I have to config below parameters. I am not quite sure if I understand it correctly. lower-bound is smallest value of the first partition, upper-bound is largest value of the last partition. For example, if the db table has

Re: Start Flink cluster, k8s pod behavior

2021-09-30 Thread Qihua Yang
Looks like after script *flink-daemon.sh *complete, it return exit 0. Kubernetes regard it as done. Is that expected? Thanks, Qihua On Thu, Sep 30, 2021 at 11:11 AM Qihua Yang wrote: > Thank you for your reply. > From the log, exit code is 0, and reason is Completed. > Looks like th

Re: Start Flink cluster, k8s pod behavior

2021-09-30 Thread Qihua Yang
looking into kubectl describe and the JobManager logs would help > in understanding what's going on. > > Best, > Matthias > > On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang wrote: > >> Hi, >> I deployed flink in session mode. I didn't run any jobs. I saw below >> log

Re: Start Flink cluster, k8s pod behavior

2021-09-30 Thread Qihua Yang
, 2021 at 11:59 PM Matthias Pohl wrote: > Hi Qihua, > I guess, looking into kubectl describe and the JobManager logs would help > in understanding what's going on. > > Best, > Matthias > > On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang wrote: > >> Hi, >> I deplo

Re: Flink run different jars

2021-09-29 Thread Qihua Yang
ders/standalone/overview/#starting-and-stopping-a-cluster > [2] > https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-a-standalone-cluster-session-mode > > Best, > Yangze Guo > > On Wed, Sep 29, 2021 at 1:02 PM

Start Flink cluster, k8s pod behavior

2021-09-29 Thread Qihua Yang
Hi, I deployed flink in session mode. I didn't run any jobs. I saw below logs. That is normal, same as Flink menual shows. + /opt/flink/bin/run-job-manager.sh Starting HA cluster with 1 masters. Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g. Starting taskexecutor daemon on

Re: Flink run different jars

2021-09-28 Thread Qihua Yang
> > Best, > Yangze Guo > > On Wed, Sep 29, 2021 at 5:57 AM Qihua Yang wrote: > > > > Hi, > > > > Is that possible to run a flink app without a job? What I am trying to > do is I build multiple jars. And switch jar to run different jobs. > > I am

Flink run different jars

2021-09-28 Thread Qihua Yang
Hi, Is that possible to run a flink app without a job? What I am trying to do is I build multiple jars. And switch jar to run different jobs. I am not sure if flink supports this mode. I saw rest API can upload jar, cancel job and run a jar. Right now I can upload a jar to flink. But when I

Re: multiple jobs in same flink app

2021-06-24 Thread Qihua Yang
e also: https://issues.apache.org/jira/browse/FLINK-19358 >> >> Sorry again that I gave you invalid information in my first answer. >> >> [1] >> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/ >> >> >> >> &g

Re: multiple jobs in same flink app

2021-06-23 Thread Qihua Yang
r. > If you want to do that, you need to use session mode, which allows > managing multiple jobs on the same JobManager. > > On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang wrote: > >> Hi Arvid, >> >> Do you know if I can start multiple jobs for a single flink applicatio

Re: multiple jobs in same flink app

2021-06-22 Thread Qihua Yang
Hi Arvid, Do you know if I can start multiple jobs for a single flink application? Thanks, Qihua On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang wrote: > Hi, > > I am using application mode. > > Thanks, > Qihua > > On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise wrote: &g

Re: multiple jobs in same flink app

2021-06-17 Thread Qihua Yang
Hi, I am using application mode. Thanks, Qihua On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise wrote: > Hi Qihua, > > Which execution mode are you using? > > On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang wrote: > >> Hi, >> >> Thank you for your reply. What I

Re: multiple jobs in same flink app

2021-06-17 Thread Qihua Yang
ink(new DiscardingSink<>()); > env.execute("Job 1+2"); > > On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang wrote: > >> Hi, >> Does anyone know how to run multiple jobs in same flink application? >> I did a simple test. First job was started. I did see

multiple jobs in same flink app

2021-06-16 Thread Qihua Yang
Hi, Does anyone know how to run multiple jobs in same flink application? I did a simple test. First job was started. I did see the log message, but I didn't see the second job was started, even I saw the log message. public void testJobs() throws Exception { StreamExecutionEnvironment env =

Re: Flink stream processing issue

2021-06-03 Thread Qihua Yang
u means you job has multiple parallelism, and whether same data from > different two inputs would be send to the same downstream subtask after > `keyedCoProcessFunction`. > Yes, Flink could do this, if you keyBy the same field for two inputs. > > Best regards, > JING ZHANG >

Flink stream processing issue

2021-06-02 Thread Qihua Yang
Hi, I have a question. We have two data streams that may contain duplicate data. We are using keyedCoProcessFunction to process stream data. I defined the same keySelector for both streams. Our flink application has multiple replicas. We want the same data to be processed by the same replica. Can

Re: ES sink never receive error code

2021-05-24 Thread Qihua Yang
equest fails after the max number of retries > (bulk.flush.backoff.retries), the failure handler will still be > called. > > > Best, > Yangze Guo > > On Fri, May 21, 2021 at 5:53 AM Qihua Yang wrote: > > > > Thank you for the reply! > > Yes, we did config bulk.

Re: ES sink never receive error code

2021-05-20 Thread Qihua Yang
docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor > > Regards, > Roman > > On Thu, May 20, 2021 at 10:08 PM Qihua Yang wrote: > > > > Hello, > > We are using flink-connector-elasticsearch6_2.11 to ingest stream data >

ES sink never receive error code

2021-05-20 Thread Qihua Yang
Hello, We are using flink-connector-elasticsearch6_2.11 to ingest stream data to ES by using bulk requests. From ES metrics, we observed some bulk thread pool rejections. Contacted AWS team, their explanation is part of bulk request was rejected. Response body should include status for each item.