Hello,
We are using flink-connector-elasticsearch6_2.11 to ingest stream data to
ES by using bulk requests. From ES metrics, we observed some bulk thread
pool rejections. Contacted AWS team, their explanation is part of bulk
request was rejected. Response body should include status for each item.
F
docs-stable/docs/connectors/datastream/elasticsearch/#configuring-the-internal-bulk-processor
>
> Regards,
> Roman
>
> On Thu, May 20, 2021 at 10:08 PM Qihua Yang wrote:
> >
> > Hello,
> > We are using flink-connector-elasticsearch6_2.11 to ingest stream data
>
st fails after the max number of retries
> (bulk.flush.backoff.retries), the failure handler will still be
> called.
>
>
> Best,
> Yangze Guo
>
> On Fri, May 21, 2021 at 5:53 AM Qihua Yang wrote:
> >
> > Thank you for the reply!
> > Yes, we did config bulk.
Hi,
I have a question. We have two data streams that may contain duplicate
data. We are using keyedCoProcessFunction to process stream data. I defined
the same keySelector for both streams. Our flink application has multiple
replicas. We want the same data to be processed by the same replica. Can
u means you job has multiple parallelism, and whether same data from
> different two inputs would be send to the same downstream subtask after
> `keyedCoProcessFunction`.
> Yes, Flink could do this, if you keyBy the same field for two inputs.
>
> Best regards,
> JING ZHANG
>
Hi,
Does anyone know how to run multiple jobs in same flink application?
I did a simple test. First job was started. I did see the log message, but
I didn't see the second job was started, even I saw the log message.
public void testJobs() throws Exception {
StreamExecutionEnvironment env =
Strea
.addSink(new DiscardingSink<>());
> env.execute("Job 1+2");
>
> On Wed, Jun 16, 2021 at 6:04 PM Qihua Yang wrote:
>
>> Hi,
>> Does anyone know how to run multiple jobs in same flink application?
>> I did a simple test. First job was started. I di
Hi,
I am using application mode.
Thanks,
Qihua
On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise wrote:
> Hi Qihua,
>
> Which execution mode are you using?
>
> On Thu, Jun 17, 2021 at 6:48 PM Qihua Yang wrote:
>
>> Hi,
>>
>> Thank you for your reply. What I
Hi Arvid,
Do you know if I can start multiple jobs for a single flink application?
Thanks,
Qihua
On Thu, Jun 17, 2021 at 12:11 PM Qihua Yang wrote:
> Hi,
>
> I am using application mode.
>
> Thanks,
> Qihua
>
> On Thu, Jun 17, 2021 at 12:09 PM Arvid Heise wrote:
&g
r.
> If you want to do that, you need to use session mode, which allows
> managing multiple jobs on the same JobManager.
>
> On Tue, Jun 22, 2021 at 10:43 PM Qihua Yang wrote:
>
>> Hi Arvid,
>>
>> Do you know if I can start multiple jobs for a single flink applicatio
t; See also: https://issues.apache.org/jira/browse/FLINK-19358
>>
>> Sorry again that I gave you invalid information in my first answer.
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/overview/
>>
>>
>>
>&g
Hi,
Is that possible to run a flink app without a job? What I am trying to do
is I build multiple jars. And switch jar to run different jobs.
I am not sure if flink supports this mode. I saw rest API can upload jar,
cancel job and run a jar.
Right now I can upload a jar to flink. But when I cancel
>
> Best,
> Yangze Guo
>
> On Wed, Sep 29, 2021 at 5:57 AM Qihua Yang wrote:
> >
> > Hi,
> >
> > Is that possible to run a flink app without a job? What I am trying to
> do is I build multiple jars. And switch jar to run different jobs.
> > I am not
Hi,
I deployed flink in session mode. I didn't run any jobs. I saw below logs.
That is normal, same as Flink menual shows.
+ /opt/flink/bin/run-job-manager.sh
Starting HA cluster with 1 masters.
Starting standalonesession daemon on host job-manager-776dcf6dd-xzs8g.
Starting taskexecutor daemon on
ders/standalone/overview/#starting-and-stopping-a-cluster
> [2]
> https://ci.apache.org/projects/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/#starting-a-standalone-cluster-session-mode
>
> Best,
> Yangze Guo
>
> On Wed, Sep 29, 2021 at 1:02 PM
, 2021 at 11:59 PM Matthias Pohl
wrote:
> Hi Qihua,
> I guess, looking into kubectl describe and the JobManager logs would help
> in understanding what's going on.
>
> Best,
> Matthias
>
> On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang wrote:
>
>> Hi,
>> I
, looking into kubectl describe and the JobManager logs would help
> in understanding what's going on.
>
> Best,
> Matthias
>
> On Wed, Sep 29, 2021 at 8:37 PM Qihua Yang wrote:
>
>> Hi,
>> I deployed flink in session mode. I didn't run any jobs. I saw bel
Looks like after script *flink-daemon.sh *complete, it return exit 0.
Kubernetes regard it as done. Is that expected?
Thanks,
Qihua
On Thu, Sep 30, 2021 at 11:11 AM Qihua Yang wrote:
> Thank you for your reply.
> From the log, exit code is 0, and reason is Completed.
> Looks like th
Hi,
I am trying to read data from database with JDBC driver. From [1], I have
to config below parameters. I am not quite sure if I understand it
correctly. lower-bound is smallest value of the first partition,
upper-bound is largest value of the last partition. For example, if the db
table has 100
gt; into the first partition and read by the first task slot, all 100 <= k <
> 200 will be divided into the second partition and read by the second task
> slot and so on. So these configurations should have nothing to do with the
> number of rows you have, but should be related t
Hi,
Sorry for asking again. I plan to use JDBC connector to scan a database.
How do I know if it is done? Are there any metrics I can track? We want to
monitor the progress, stop flink application when it is done.
Thanks,
Qihua
On Fri, Oct 8, 2021 at 10:07 AM Qihua Yang wrote:
> It is pre
tead of a streaming job.
> Call EnvironmentSettings.newInstance().inBatchMode().build() to create your
> environment settings for a batch job.
>
> Qihua Yang 于2021年10月13日周三 上午5:50写道:
>
>> Hi,
>>
>> Sorry for asking again. I plan to use JDBC connector to scan a dat
Hi,
We plan to use JDBC SQL connector to read/write database. I saw JDBC
connector use username and password. Is it possible to use secret(*.crt) to
access database. I didn't find guideline how to use it. How to config jdbc
with secret?
val jdbc: JdbcConnectionOptions =
JdbcConnectionOptions.Jdbc
5
> [5]
> https://www.ibm.com/docs/ar/informix-servers/14.10?topic=options-connecting-jdbc-applications-ssl
>
> Best,
> JING ZHANG
>
>
> Qihua Yang 于2021年10月23日周六 下午1:11写道:
>
>> Hi,
>>
>> We plan to use JDBC SQL connector to read/write database. I saw
Hi,
My flink app has two data sources. One is from a Kafka topic, one is from a
database by using the JDBC connector. Flink scan the full database table.
Which mode should we use? batch mode or streaming mode?
How do we know the database table is fully scanned? Will Flink throw any
signal to show
new 1.14 feature called Hybrid Source:
> https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/connectors/datastream/hybridsource/
>
> Rafi
>
>
> On Tue, Oct 26, 2021 at 7:46 PM Qihua Yang wrote:
>
>> Hi,
>>
>> My flink app has two data sources. One is
Hi,
I am trying to use postgres DB as the stream data source and push to kafka
topic. Here is how I config database source. Looks like it didn't read out
any data. But I didn't see any error from the flink log.
I did a test, tried to insert wrong data to database, I saw flink throw
below error. Lo
Hi,
Our flink application has two sinks(DB and kafka topic). We want to push
same data to both sinks. Is it possible to push data to kafka topic only
after data is pushed to DB successfully? If the commit to DB fail, we don't
want those data is pushed to kafka.
Thanks,
Qihua
gest that stream using Flink to push data later to
>> Kafka.
>>
>> On Wed, Nov 3, 2021 at 6:17 AM Guowei Ma wrote:
>>
>>> Hi, Qihua
>>>
>>> AFAIK there is no way to do it. Maybe you need to implement a "new" sink
>>> to archive this ta
um, but less custom code.
>
> Therefore, it is mostly up to your requirements and available resources
> you have
> on how to proceed.
>
> Sincerely,
>
> Ali Bahadir Zeybek
>
>
>
>
>
> On Wed, Nov 3, 2021 at 10:13 PM Qihua Yang wrote:
>
>> Many
Hi,
Our stream has two sources. one is a Kafka topic, one is a database. Is it
possible to consume from kafka topic only after DB scan is completed? We
configured it in batch mode.
Thanks,
Qihua
/datastream/hybridsource/
>
> On Fri, Nov 5, 2021 at 3:53 PM Qihua Yang wrote:
>
>> Hi,
>>
>> Our stream has two sources. one is a Kafka topic, one is a database. Is
>> it possible to consume from kafka topic only after DB scan is completed? We
>> configured it in batch mode.
>>
>> Thanks,
>> Qihua
>>
>
If HybridSource cannot support JdbcSource, is there any approach I can try?
sequentially read input from two sources. After read data from database,
start to read data from kafka topic?
Thanks,
Qihua
On Fri, Nov 5, 2021 at 10:44 PM Qihua Yang wrote:
> Hi Austin,
>
> That is exactly wh
Hi,
We are running a flink cluster in session mode. After the cluster is
launched, no job is running. We want to use REST api to run a job at some
point.
The jobManager already contains a jar when we deploy. We don't want to
upload a new jar by using REST api. Is there any way that I use REST api
On Mon, Nov 22, 2021 at 6:09 PM Caizhi Weng wrote:
> Hi!
>
> You can try /jars/:jarId/run, see [1]
>
> [1]
> https://nightlies.apache.org/flink/flink-docs-master/docs/ops/rest_api/#jars-jarid-run
>
> Qihua Yang 于2021年11月23日周二 上午9:27写道:
>
>> Hi,
>>
>> We a
Hi,
I found a weird behavior. We launched a k8s cluster without job. But
includes the jar A. I use Flink rest api to upload a dummy jar(actually it
can be any jar). Flink will create a jar id. Then I use rest api to start
the job with the jar A entry-class. But the jar id is the dummy jar id.
Flin
Hi Yun,
Thank you for your reply! testB.jar doesn't have the same entry class as
testA.jar.
So is it expected behavior? What is the theory behind?
Thanks,
Qihua
On Fri, Jan 7, 2022 at 4:27 PM Yun Gao wrote:
>
> Hi Qihua
>
> Sorry may I double confirm that whether the entry class exists in both
Hi,
I plan to use the database as flink source by using flink JDBC. I know
setNumPartitions can be used for parallelism in table reading.
If the num of task managers is less than numPartitions, what is the
behavior?
For example:
I config setNumPartitions(20), but flink application only has 10 task
Hi,
I have a flink cluster(50 hosts, each host runs a task manager).
I am using Flink JDBC to consume data from a database. The db table is
pretty large, around 18734 rows. I configured the JDBC number of
partitions to 50. fetchSize is 20. Flink application has 50 task managers.
Anyone know wh
rror thrown from the
UncaughtExceptionHandler in thread "HTTP-Dispatcher"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "I/O dispatcher 11"
Exception: java.lang.OutOfMemoryError thrown from the
UncaughtExceptionHandler in thread "I/O dispatch
ed behavior. As you have set fetchSize to 20 there
> will be only 20 records in each parallelism of the source. How large is
> your heap size? Does your job have any other operations which consume a lot
> of heap memory?
>
> Qihua Yang 于2022年1月19日周三 15:27写道:
&g
partitions than task slots, each task slot may be
> assigned more than one partition. By default this is a first come first
> served assignment. After a task slot finishes its current partition, it
> will ask from the job manager for the next partition unless all partitions
> have been dis
Should I change the query? something like below to add a limit? If no
limit, does that mean flink will read whole huge table to memory and fetch
and return 20 records each time?
val query = String.format("SELECT * FROM %s limit 1000", tableName)
On Tue, Jan 18, 2022 at 11:56 PM
Hi,
We are using flink jdbc connector to read whole database table line by
line. A few things I don't quite understand.
We configured BATCH_SIZE=100 and PARTITION_NUM=1000, table is pretty big.
What is the flink internal behavior to read data from table?
Flink read BATCH_SIZE data each time? Or it
44 matches
Mail list logo