Re: avro.ComplexPayloadAvro

2021-05-25 Thread Qishang
Hi.

会生成 `${project.basedir}/target/generated-sources/`
https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-datastream-allround-test/pom.xml#L97

r pp  于2021年5月25日周二 上午9:58写道:

> 各位好,请问下,
>
> https://github.com/apache/flink/blob/master/flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
>
> 在该类下的
>
>
> /flink-end-to-end-tests/flink-datastream-allround-test/src/main/java/org/apache/flink/streaming/tests/DataStreamAllroundTestProgram.java
> 下面两个类,在代码哪里?
> import org.apache.flink.streaming.tests.avro.ComplexPayloadAvro;
> import org.apache.flink.streaming.tests.avro.InnerPayLoadAvro;
> --
> Best,
>   pp
>


How can I use different user run flink

2021-05-25 Thread igyu
I use CDH 6.3.2
flink-1.12.3

I enabel kerberos

I want to use different user with different keytab,because I creat many queue 
in YARN , different user use different queue. 



igyu


Re: 关于 flinksql 维表的问题

2021-05-25 Thread datayangl
我感觉楼主的意思 是mysql的数据源预加载后 定期重新拉取,和lookup join 还不太一样,如果loopup不能做到更新数据。 我理解对吗,各位?



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: 集群重启如何保证带状态的任务自动从最近一个checkpoint恢复?

2021-05-25 Thread datayangl
FixedDelaStrategy 默认是从最近一个ck
恢复,其他的策略可以看官网。如果你是想问怎么实现的,不建议在邮件列表里问实现原理的问题。可以google找相关文章、相关flip 或者
直接debug源码。



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: java.lang.IllegalStateException: Trying to access closed classloader.

2021-05-25 Thread panzhihao
是不是在 flink 配置文件里 flink-conf.yaml  设置
classloader.check-leaked-classloader:false 就可以了



--
Sent from: http://apache-flink.147419.n8.nabble.com/

Re: java.lang.IllegalStateException: Trying to access closed classloader.

2021-05-25 Thread panzhihao
是不是在 flink 配置文件里 flink-conf.yaml  
设置 classloader.check-leaked-classloader:false 就可以了



--
Sent from: http://apache-flink.147419.n8.nabble.com/


flink状态查看工具

2021-05-25 Thread casel.chen
我有一个flink sql写的数据实时同步作业,从mysql binlog cdc消费发到mongodb,仅此而已,没有lookup,也没有join。
查看checkpoint页显示状态有17MB,checkpoint耗时要2s。
想知道为什么状态会如此之大,有没有状态查看工具看看里面到底存了什么信息?

Re: Job recovery issues with state restoration

2021-05-25 Thread Roman Khachatryan
> I am not able to consistently reproduce this issue. It seems to only occur 
> when the failover happens at the wrong time. I have disabled task local 
> recovery and will report back if we see this again.

Thanks, please post any results here.

> The SST files are not the ones for task local recovery, those would be in a 
> different directory (we have configured io.tmp.dirs as /mnt/data/tmp).

Those files on /mnt could still be checked against the ones in
checkpoint directories (on S3/DFS), the size should match.

I'm also curious why do you place local recovery files on a remote FS?
(I assume /mnt/data/tmp is a remote FS or a persistent volume).
Currently, if a TM is lost (e.g. process dies) then those files can
not be used - and recovery will fallback to S3/DFS. So this probably
incurs some IO/latency unnecessarily.

Regards,
Roman

On Tue, May 25, 2021 at 2:16 PM Peter Westermann
 wrote:
>
> Hi Roman,
>
>
>
> I am not able to consistently reproduce this issue. It seems to only occur 
> when the failover happens at the wrong time. I have disabled task local 
> recovery and will report back if we see this again. We need incremental 
> checkpoints for our workload.
>
> The SST files are not the ones for task local recovery, those would be in a 
> different directory (we have configured io.tmp.dirs as /mnt/data/tmp).
>
>
>
> Thanks,
>
> Peter
>
>
>
>
>
> From: Roman Khachatryan 
> Date: Thursday, May 20, 2021 at 4:54 PM
> To: Peter Westermann 
> Cc: user@flink.apache.org 
> Subject: Re: Job recovery issues with state restoration
>
> Hi Peter,
>
> Do you experience this issue if running without local recovery or
> incremental checkpoints enabled?
> Or have you maybe compared local (on TM) and  remove (on DFS) SST files?
>
> Regards,
> Roman
>
> On Thu, May 20, 2021 at 5:54 PM Peter Westermann
>  wrote:
> >
> > Hello,
> >
> >
> >
> > I’ve reported issues around checkpoint recovery in case of a job failure 
> > due to zookeeper connection loss in the past. I am still seeing issues 
> > occasionally.
> >
> > This is for Flink 1.12.3 with zookeeper for HA, S3 as the state backend, 
> > incremental checkpoints, and task-local recovery enabled.
> >
> >
> >
> > Here’s what happened: A zookeeper instance was terminated as part of a 
> > deployment for our zookeeper service, this caused a new jobmanager leader 
> > election (so far so good). A leader was elected and the job was restarted 
> > from the latest checkpoint but never became healthy. The root exception and 
> > the logs show issues reading state:
> >
> > o.r.RocksDBException: Sst file size mismatch: 
> > /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003579.sst.
> >  Size recorded in manifest 36718, actual size 2570\
> > Sst file size mismatch: 
> > /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003573.sst.
> >  Size recorded in manifest 13756, actual size 1307\
> > Sst file size mismatch: 
> > /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003575.sst.
> >  Size recorded in manifest 16278, actual size 1138\
> > Sst file size mismatch: 
> > /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003576.sst.
> >  Size recorded in manifest 23108, actual size 1267\
> > Sst file size mismatch: 
> > /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003577.sst.
> >  Size recorded in manifest 148089, actual size 1293\
> > \
> > \\tat org.rocksdb.RocksDB.open(RocksDB.java)\
> > \\tat org.rocksdb.RocksDB.open(RocksDB.java:286)\
> > \\tat 
> > o.a.f.c.s.s.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:80)\
> > \\t... 22 common frames omitted\
> > Wrapped by: java.io.IOException: Error while opening RocksDB instance.\
> > \\tat 
> > o.a.f.c.s.s.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:92)\
> > \\tat 
> > o.a.f.c.s.s.r.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:145)\
> > \\tat 
> > o.a.f.c.s.s.r.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOper...
> >
> >
> >
> > Since we retain multiple checkpoints, I tried redeploying the job from all 
> > checkpoints that were still available. All those attempts lead to similar 
> > failures. (I eventually had to use an older savepoint to recover the 

Re: KafkaSource metrics

2021-05-25 Thread Alexey Trenikhun
Looks like when KafkaSource is used instead of FlinkKafkaConsumer, metrics 
listed below are not available. Bug? Work in progress?


Thanks,
Alexey

From: Ardhani Narasimha 
Sent: Monday, May 24, 2021 9:08 AM
To: 陳樺威 
Cc: user 
Subject: Re: KafkaSource metrics

Use below respectively

flink_taskmanager_job_task_operator_KafkaConsumer_bytes_consumed_rate - 
Consumer rate
flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max - Consumer lag
flink_taskmanager_job_task_operator_KafkaConsumer_commit_latency_max - commit 
latency

unsure if reactive mode makes any difference.
On Mon, May 24, 2021 at 7:44 PM 陳樺威 
mailto:oscar8492...@gmail.com>> wrote:
Hello,

Our team tries to test reactive mode and replace FlinkKafkaConsumer with the 
new KafkaSource.
But we can’t find the KafkaSource metrics list. Does anyone have any idea? In 
our case, we want to know the Kafka consume delay and consume rate.

Thanks,
Oscar

---
IMPORTANT: The contents of this email and any attachments are confidential and 
protected by applicable laws. If you have received this email by mistake, 
please (i) notify the sender immediately; (ii) delete it from your database; 
and (iii) do not disclose the contents to anyone or make copies thereof. 
Razorpay accepts no liability caused due to any inadvertent/ unintentional data 
transmitted through this email.
---


Re: Choice of time characteristic and performance

2021-05-25 Thread Bob Tiernay
Thanks for you guidance Robert!

Do you think disabling watermarking would help in terms of a slight
performance boost in such scenarios? 

Bob



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: yarn ship from s3

2021-05-25 Thread Vijayendra Yadav
Hi Piotr,

I have been doing the same process as you mentioned so far, now I am
migrating the deployment process using AWS CDK and AWS Step Functions, kind
of like the CICD process.
I added a download step of jar and configs (1, 2, 3 and 4) from S3 using
command-runner.jar (AWS Step); it loaded that into one of the Master nodes
(out of 3). In the next step when I launched Flink Job it would not find
build because Job is launched in some other yarn node.

I was hoping just like *Apache spark *where whatever files we provide in
*--file*s are shipped to yarn (s3 to yarn workfirectory), Flink should also
have a solution.

Thanks,
Vijay


On Tue, May 25, 2021 at 12:50 AM Piotr Nowojski 
wrote:

> Hi Vijay,
>
> I'm not sure if I understand your question correctly. You have jar and
> configs (1, 2, 3 and 4) on S3 and you want to start a Flink job using
> those? Can you simply download those things (whole directory containing
> those) to the machine that will be starting the Flink job?
>
> Best, Piotrek
>
> wt., 25 maj 2021 o 07:50 Vijayendra Yadav 
> napisał(a):
>
>> Hi Team,
>>
>> I am trying to find a way to ship files from aws s3 for a flink streaming
>> job, I am running on AWS EMR. What i need to ship are following:
>> 1) application jar
>> 2) application property file
>> 3) custom flink-conf.yaml
>> 4) log4j application specific
>>
>> Please let me know options.
>>
>> Thanks,
>> Vijay
>>
>


Re: Time needed to read from Kafka source

2021-05-25 Thread Arvid Heise
Could you share your KafkaDeserializationSchema, we might be able to spot
some optimization potential. You could also try out enableObjectReuse [1],
which avoids copying data between tasks (not sure if you have any
non-chained tasks).

If you are on 1.13, you could check out the flamegraph to see where the
bottleneck occurs. [2]

[1]
https://ci.apache.org/projects/flink/flink-docs-master/docs/dev/execution/execution_configuration/
[2]
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/ops/debugging/flame_graphs/

On Tue, May 25, 2021 at 5:12 PM Piotr Nowojski  wrote:

> Hi,
>
> That's a throughput of 700 records/second, which should be well below
> theoretical limits of any deserializer (from hundreds thousands up to tens
> of millions records/second/per single operator), unless your records are
> huge or very complex.
>
> Long story short, I don't know of a magic bullet to help you solve your
> problem. As always you have two options, either optimise/speed up your
> code/job, or scale up.
>
> If you choose the former, think about Flink as just another Java
> application. Check metrics and resource usage, and understand what resource
> is the problem (cpu? memory? machine is swapping? io?). You might be able
> to guess what's your bottleneck (reading from kafka? deserialisation?
> something else? Flink itself?) by looking at some of the metrics
> (busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or
> you can also simplify your job to bare minimum and test performance of
> independent components. Also you can always attach a code profiler and
> simply look at what's happening. First identify what's the source of the
> bottleneck and then try to understand what's causing it.
>
> Best,
> Piotrek
>
> [1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also
> comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes
> in the job graph based on busy/back pressured status and Flamegraph
> support)
>
> wt., 25 maj 2021 o 15:44 B.B.  napisał(a):
>
>> Hi,
>>
>> I am in the process of optimizing my job which at the moment by our
>> thinking is too slow.
>>
>> We are deploying job in kubernetes with 1 job manager with 1gb ram and 1
>> cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and
>> parallelism of two).
>>
>> The main problem is one kafka source that has 3,8 million events that we
>> have to process.
>> As a test we made a simple job that connects to kafka using a custom
>> implementation of KafkaDeserializationSchema. There we are using
>> ObjectMapper that mapps input values eg.
>>
>> *var event = objectMapper.readValue(consumerRecord.value(),
>> MyClass.class);*
>>
>> This is then validated with hibernate validator and output of this
>> source is printed on the console.
>>
>> The time needed for the job to consume all the events was one and a half
>> hours, which seems a bit long.
>> Is there a way we can speed up this process?
>>
>> Is more cpu cores or memory solution?
>> Should we switch to avro deserialization schema?
>>
>>
>>
>>


unsubscribe

2021-05-25 Thread Dc Zhao (BLOOMBERG/ 120 PARK)
unsubscribe


<< {CH} {TS} Anything that can possibly go wrong, it does. >>

Fwd: Getting error in pod template

2021-05-25 Thread Priyanka Manickam
-- Forwarded message -
From: Priyanka Manickam 
Date: Tue, 25 May 2021, 21:11
Subject: Re: Getting error in pod template
To: Yang Wang 


Hi team ,

Now i am able to run the flink with pod-template. Thanks for the input.


Problem 1:
 But i am not able to pull the jar from the blob storage throught command
used in the pod template file.

Problem 2:
Also, we are trying to write the events from one topic to another topic.
Where with parallelism 8, task slots 8, -Djobmanager.memory.process.size=1g,
-Dtaskmanager.memory.process.size=2g,
-Dkubernetes.jobmanager.cpu=0.5,
-Dtaskmanager.cpu=2.

Kafka(eventhub ) partition =3, we have planned to get 1 lac messages per
second.

But,I was able to get the through put of 555 mesages per seconds.

I have tried to increase the parallelism also, it doesnot work.

Could you please help me out here.

Thanks,
Priyanka Manickam.

On Fri, 14 May 2021, 21:00 Priyanka Manickam, 
wrote:

> Hi yang,
>
> I was using pod template to fetch the logs to the particular repository.
>
> But while deploying i have got some error , says "
> jobmanager-pod-template" is invalid : spec.containers(0).image: required
> value.
>
> . And if i try to give add the image for flink-main-container. Its giving
> image pull back of error.
>
> Am i proceeding in the a correct way . Because in the flink official
> website , no image is added after the flink-main-container.
>
> Could you please help with this. I have also searchsd for the demo videos
> for using the pod template with flink native kubernetes but i could not
> able to find..If you could share any demo videos on the website it will
> very useful for everyone.
>
> Good year ahead..
>
> Thanks,
> Priyanka Manickam.
>
>
>


Re: How to read large amount of data from hive and write to redis, in a batch manner?

2021-05-25 Thread Piotr Nowojski
Hi,

You could always buffer records in your sink function/operator, until a
large enough batch is accumulated and upload the whole batch at once. Note
that if you want to have at-least-once or exactly-once semantics, you would
need to take care of those buffered records in one way or another. For
example you could:
1. Buffer records on some in memory data structure (not Flink's state), and
just make sure that those records are flushed to the underlying sink on
`CheckpointedFunction#snapshotState()` calls
2. Buffer records on Flink's state (heap state backend or rocksdb - heap
state backend would be the fastest with little overhead, but you can risk
running out of memory), and that would easily give you exactly-once. That
way your batch could span multiple checkpoints.
3. Buffer/write records to temporary files, but in that case keep in mind
that those files need to be persisted and recovered in case of failure and
restart.
4. Ignore checkpointing and either always restart the job from scratch or
accept some occasional data loss.

FYI, virtually every connector/sink is internally batching writes to some
extent. Usually by doing option 1.

Piotrek

wt., 25 maj 2021 o 14:50 Yik San Chan 
napisał(a):

> Hi community,
>
> I have a Hive table that stores tens of millions rows of data. In my Flink
> job, I want to process the data in batch manner:
>
> - Split the data into batches, each batch has (maybe) 10,000 rows.
> - For each batch, call a batchPut() API on my redis client to dump in
> Redis.
>
> Doing so in a streaming manner is not expected, as that will cause too
> many round trips between Flink workers and Redis.
>
> Is there a way to do that? I find little clue in Flink docs, since almost
> all APIs feel better suited for streaming processing by default.
>
> Thank you!
>
> Best,
> Yik San
>


RE: Flink 1.11.3 NoClassDefFoundError: Could not initialize class

2021-05-25 Thread Georgi Stoyanov
Hi Piotr, thank you for the fast reply.

The job is restarting in the same flink session and fails with that exception. 
When I delete the pods (we are using the google cdr, so I just kubectl delete 
FlinkCluster …) and the yaml is applied again, it’s working as expected. It 
looks to me that it’s jar problem, since I just notice it started to fail with 
a class from a internal common library, not only the jobs
java.lang.NoClassDefFoundError: Could not initialize 
com.my.organization.core.cfg.PropertiesConfigurationClass
at 
com.my.organization.core.CassandraSink$1.buildCluster(CassandraSink.java:162)
at 
org.apache.flink.streaming.connectors.cassandra.ClusterBuilder.getCluster(ClusterBuilder.java:32)
at 
org.apache.flink.streaming.connectors.cassandra.CassandraSinkBase.open(CassandraSinkBase.java:86)
at 
org.apache.flink.streaming.connectors.cassandra.CassandraPojoSink.open(CassandraPojoSink.java:106)
at 
org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at 
org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at 
org.apache.flink.streaming.api.operators.StreamSink.open(StreamSink.java:48)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.initializeStateAndOpenOperators(OperatorChain.java:291)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.lambda$beforeInvoke$1(StreamTask.java:506)
at 
org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.runThrowing(StreamTaskActionExecutor.java:47)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:475)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)

From: Piotr Nowojski 
Sent: Tuesday, May 25, 2021 6:18 PM
To: Georgi Stoyanov 
Cc: user@flink.apache.org
Subject: Re: Flink 1.11.3 NoClassDefFoundError: Could not initialize class

Hi Georgi,

I don't think it's a bug in Flink. It sounds like some problem with 
dependencies or jars in your job. Can you explain a bit more what do you mean 
by:

> that some of them are constantly restarting with the following exception. 
> After restart, everything is working as expected

constantly restarting, but after a restart everything is working?

Best,
Piotrek

wt., 25 maj 2021 o 16:12 Georgi Stoyanov 
mailto:gstoya...@live.com>> napisał(a):
Hi all,


We have running several Flink jobs on k8s with flink 1.11.3 and recently we 
notice that some of them are constantly restarting with the following 
exception. After restart, everything is working as expected.
Could this be a bug?
2021-05-25 17:04:42
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.init(OperatorChain.java:126)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: unexpected exception type
at 
java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1750)
at 
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1280)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at 

Re:flink sql写mysql中文乱码问题

2021-05-25 Thread casel.chen



我看flink官网介绍Table API & SQL的时候有打印sql执行计划中使用的是UTF-16LE字符集,为什么不用UTF-8呢?乱码会不会跟这个有关?
https://ci.apache.org/projects/flink/flink-docs-release-1.13/zh/docs/dev/table/common/#%e8%a7%a3%e9%87%8a%e8%a1%a8



上述例子的结果是:

```text == Abstract Syntax Tree == LogicalUnion(all=[true]) 
LogicalFilter(condition=[LIKE($1, _UTF-16LE'F%')]) 
FlinkLogicalDataStreamScan(id=[1], fields=[count, word]) 
FlinkLogicalDataStreamScan(id=[2], fields=[count, word])

== Optimized Logical Plan == DataStreamUnion(all=[true], union all=[count, 
word]) DataStreamCalc(select=[count, word], where=[LIKE(word, _UTF-16LE’F%')]) 
DataStreamScan(id=[1], fields=[count, word]) DataStreamScan(id=[2], 
fields=[count, word])

== Physical Execution Plan == Stage 1 : Data Source content : collect elements 
with CollectionInputFormat

















在 2021-05-25 10:40:46,"casel.chen"  写道:
>数据库字符编码设置如下
>
>
>character_set_client,utf8mb4
>character_set_connection,utf8mb4
>character_set_database,utf8mb4
>character_set_filesystem,binary
>character_set_results,utf8mb4
>character_set_server,utf8
>character_set_system,utf8
>character_sets_dir,/u01/mysql57_20200229/share/charsets/
>
>
>客户端连接串是 
>jdbc:mysql://host:3306/datav_test?useUnicode=true=utf8
>
>
>本地运行flink sql 作业插入中文是正常显示的,一部署到测试服务器跑就会出现中文乱码。有何修复建议?谢谢!
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>在 2021-05-19 17:52:01,"Michael Ran"  写道:
>>
>>
>>
>>数据库的字段字符编码
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>
>>在 2021-05-18 18:19:31,"casel.chen"  写道:
>>>我的URL连接串已经使用了  useUnicode=truecharacterEncoding=UTF-8 结果还是会有乱码
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>
>>>在 2021-05-18 17:21:12,"王炳焱" <15307491...@163.com> 写道:
你在flinkSQL连接mysql表的时候配置url=jdbc:mysql://127.0.0.1:3306/database?useUnicode=truecharacterEncoding=UTF-8,像这样CREATE
 TABLE jdbc_sink(id INT  COMMENT '订单id',goods_name 
VARCHAR(128) COMMENT '商品名称',price DECIMAL(32,2) COMMENT 
'商品价格',user_name VARCHAR(64) COMMENT '用户名称') WITH (   
'connector' = 'jdbc',   'url' = 
'jdbc:mysql://127.0.0.1:3306/database?useUnicode=truecharacterEncoding=UTF-8',
   'username' = 'mysqluser',   'password' = 'mysqluser',   
'table-name' = 'jdbc_sink')
在 2021-05-18 11:55:46,"casel.chen"  写道:
>我的flink sql作业如下
>
>
>SELECT
>product_name,
>window_start,
>window_end,
>CAST(SUM(trans_amt)ASDECIMAL(24,2)) trans_amt,
>CAST(COUNT(order_no)ASBIGINT) trans_cnt,
>-- LOCALTIMESTAMP AS insert_time,
>'微支付事业部'AS bus_name
>FROM(
>
>
>mysql sink表的定义如下
>CREATE TABLE XXX (
>) Engine=InnoDB AUTO_INCREMENT=31 DEFAULT CHARSET=utf8mb4;
>
>
>运行起来后写入mysql表的数据带有中文乱码 ??
>
>
>
>查看作业运行日志后发现其使用了 UTF-16LE 字符集,有什么办法可以让其使用 utf8mb4 字符集么?
>2021-05-17 18:02:25,010 INFO 
>org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Received task 
>GroupAggregate(groupBy=[product_name, window_start, window_end], 
>select=[product_name, window_start, window_end, SUM_RETRACT(trans_amt) AS 
>$f3, COUNT_RETRACT(order_no) AS $f4]) -> Calc(select=[CAST(product_name) 
>AS product_name, (CAST(window_start) DATE_FORMAT _UTF-16LE'-MM-dd 
>HH:mm:ss') AS window_start, (CAST(window_end) DATE_FORMAT 
>_UTF-16LE'-MM-dd HH:mm:ss') AS window_end, CAST($f3) AS trans_amt, 
>CAST($f4) AS trans_cnt, CAST(()) AS insert_time, 
>_UTF-16LE'??':VARCHAR(2147483647) CHARACTER SET "UTF-16LE" 
>AS bus_name]) -> Sink: 
>Sink(table=[default_catalog.default_database.all_trans_5m_new], 
>fields=[product_name, window_start, window_end, trans_amt, trans_cnt, 
>insert_time, bus_name]) (1/1)#0 (1b5f26dcd9a6071f36753b93a0ea9bea), deploy 
>into slot with allocation id 9f4c7d45bdf429f89158e2f8451663e0.
>2021-05-17 18:02:25,013 INFO org.apache.flink.runtime.taskmanager.Task [] 
>- GroupAggregate(groupBy=[product_name, window_start, window_end, id, 
>data_type, mer_cust_id, order_no, trans_date], select=[product_name, 
>window_start, window_end, id, data_type, mer_cust_id, order_no, 
>trans_date, MAX_RETRACT(trans_amt) AS trans_amt]) -> 
>Calc(select=[product_name, window_start, window_end, trans_amt, order_no]) 
>(1/1)#0 (ef6b0a94e75cc1665e4ce3d40e74ab0c) switched from CREATED to 
>DEPLOYING.


Re: Customer operator in BATCH execution mode

2021-05-25 Thread Piotr Nowojski
Hi,

1. I don't know if there is a built-in way of doing it. You can always pass
this information anyway on your own when you are starting the job (via
operator/function's constructors).
2. Yes, I think this should work.

Best,
Piotrek

wt., 25 maj 2021 o 17:05 ChangZhuo Chen (陳昌倬) 
napisał(a):

> Hi,
>
> Currently, we want to use batch execution mode [0] and historical data
> to build state for our streaming application. Due to different between
> batch & streaming mode, we want to check current execution mode in
> custom operator. So our question is:
>
>
> * Is there any API for custom operator to know current execution mode
>   (batch or streaming)?
>
> * If we want to output after all elements of one specific key are
>   processed, can we just use timer since timer is triggered at the end
>   of input [0]?
>
>
> [0]
> https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/
>
> --
> ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
> http://czchen.info/
> Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B
>


Re: Flink 1.11.3 NoClassDefFoundError: Could not initialize class

2021-05-25 Thread Piotr Nowojski
Hi Georgi,

I don't think it's a bug in Flink. It sounds like some problem with
dependencies or jars in your job. Can you explain a bit more what do you
mean by:

> that some of them are constantly restarting with the following exception.
After restart, everything is working as expected

constantly restarting, but after a restart everything is working?

Best,
Piotrek

wt., 25 maj 2021 o 16:12 Georgi Stoyanov  napisał(a):

> Hi all,
>
>
> We have running several Flink jobs on k8s with flink 1.11.3 and recently
> we notice that some of them are constantly restarting with the following
> exception. After restart, everything is working as expected.
> Could this be a bug?
> 2021-05-25 17:04:42
> org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot
> instantiate user function.
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275)
> at
> org.apache.flink.streaming.runtime.tasks.OperatorChain.init(OperatorChain.java:126)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)
> at
> org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
> at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
> at java.lang.Thread.run(Thread.java:748)
> Caused by: java.io.IOException: unexpected exception type
> at
> java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1750)
> at
> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1280)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at
> java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
> at
> java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
> at
> java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
> at
> java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
> at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
> at
> org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
> at
> org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
> at
> org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260)
> ... 6 more
> Caused by: java.lang.reflect.InvocationTargetException
> at sun.reflect.GeneratedMethodAccessor282.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
> at sun.reflect.GeneratedMethodAccessor281.invoke(Unknown Source)
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:498)
> at
> java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274)
> ... 23 more
> Caused by: java.lang.NoClassDefFoundError: Could not initialize
> com.my.organization.MyPerfectlyWorkingJob
> ... 31 more
>
>


Re: Time needed to read from Kafka source

2021-05-25 Thread Piotr Nowojski
Hi,

That's a throughput of 700 records/second, which should be well below
theoretical limits of any deserializer (from hundreds thousands up to tens
of millions records/second/per single operator), unless your records are
huge or very complex.

Long story short, I don't know of a magic bullet to help you solve your
problem. As always you have two options, either optimise/speed up your
code/job, or scale up.

If you choose the former, think about Flink as just another Java
application. Check metrics and resource usage, and understand what resource
is the problem (cpu? memory? machine is swapping? io?). You might be able
to guess what's your bottleneck (reading from kafka? deserialisation?
something else? Flink itself?) by looking at some of the metrics
(busyTimeMsPerSecond [1] or idleTimeMsPerSecond could help with that), or
you can also simplify your job to bare minimum and test performance of
independent components. Also you can always attach a code profiler and
simply look at what's happening. First identify what's the source of the
bottleneck and then try to understand what's causing it.

Best,
Piotrek

[1] busyTimeMsPerSecond is available since Flink 1.13. Flink 1.13 also
comes with nice tools to analyse bottlenecks in the WebUI (coloring nodes
in the job graph based on busy/back pressured status and Flamegraph
support)

wt., 25 maj 2021 o 15:44 B.B.  napisał(a):

> Hi,
>
> I am in the process of optimizing my job which at the moment by our
> thinking is too slow.
>
> We are deploying job in kubernetes with 1 job manager with 1gb ram and 1
> cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and
> parallelism of two).
>
> The main problem is one kafka source that has 3,8 million events that we
> have to process.
> As a test we made a simple job that connects to kafka using a custom
> implementation of KafkaDeserializationSchema. There we are using
> ObjectMapper that mapps input values eg.
>
> *var event = objectMapper.readValue(consumerRecord.value(),
> MyClass.class);*
>
> This is then validated with hibernate validator and output of this
> source is printed on the console.
>
> The time needed for the job to consume all the events was one and a half
> hours, which seems a bit long.
> Is there a way we can speed up this process?
>
> Is more cpu cores or memory solution?
> Should we switch to avro deserialization schema?
>
>
>
>


Re: Flink 1.12.4 docker image

2021-05-25 Thread Arvid Heise
Hi Nikola,

https://hub.docker.com/r/apache/flink now contains the images. It takes a
few days until https://hub.docker.com/_/flink is updated though.

Sorry for the hassle.

Best,

Arvid

On Tue, May 25, 2021 at 3:08 PM Arvid Heise  wrote:

> Hi Nikola,
>
> I'm looking into it. I might have missed a step during release.
>
> Best,
>
> Arvid
>
> On Mon, May 24, 2021 at 3:47 PM Nikola Hrusov  wrote:
>
>> Hello,
>>
>> I saw that flink 1.12.4 just got released. However I am struggling to
>> find the docker image.
>>
>> I checked both:
>> - https://hub.docker.com/_/flink
>> - https://hub.docker.com/r/apache/flink
>>
>> but on both 1.12.4 is not available.
>>
>> Are there plans to publish it as a docker image?
>>
>> Regards
>> ,
>> Nikola
>>
>>


Customer operator in BATCH execution mode

2021-05-25 Thread 陳昌倬
Hi,

Currently, we want to use batch execution mode [0] and historical data
to build state for our streaming application. Due to different between
batch & streaming mode, we want to check current execution mode in
custom operator. So our question is:


* Is there any API for custom operator to know current execution mode
  (batch or streaming)?

* If we want to output after all elements of one specific key are
  processed, can we just use timer since timer is triggered at the end
  of input [0]?


[0] 
https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/datastream/execution_mode/

-- 
ChangZhuo Chen (陳昌倬) czchen@{czchen,debian}.org
http://czchen.info/
Key fingerprint = BA04 346D C2E1 FE63 C790  8793 CC65 B0CD EC27 5D5B


signature.asc
Description: PGP signature


Flink 1.11.3 NoClassDefFoundError: Could not initialize class

2021-05-25 Thread Georgi Stoyanov
Hi all,


We have running several Flink jobs on k8s with flink 1.11.3 and recently we 
notice that some of them are constantly restarting with the following 
exception. After restart, everything is working as expected.
Could this be a bug? 
2021-05-25 17:04:42
org.apache.flink.streaming.runtime.tasks.StreamTaskException: Cannot 
instantiate user function.
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:275)
at 
org.apache.flink.streaming.runtime.tasks.OperatorChain.init(OperatorChain.java:126)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.beforeInvoke(StreamTask.java:459)
at 
org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:526)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:721)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:546)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: unexpected exception type
at 
java.io.ObjectStreamClass.throwMiscException(ObjectStreamClass.java:1750)
at 
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1280)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2196)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at 
java.io.ObjectInputStream.defaultReadFields(ObjectInputStream.java:2405)
at java.io.ObjectInputStream.readSerialData(ObjectInputStream.java:2329)
at 
java.io.ObjectInputStream.readOrdinaryObject(ObjectInputStream.java:2187)
at java.io.ObjectInputStream.readObject0(ObjectInputStream.java:1667)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:503)
at java.io.ObjectInputStream.readObject(ObjectInputStream.java:461)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:576)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:562)
at 
org.apache.flink.util.InstantiationUtil.deserializeObject(InstantiationUtil.java:550)
at 
org.apache.flink.util.InstantiationUtil.readObjectFromConfig(InstantiationUtil.java:511)
at 
org.apache.flink.streaming.api.graph.StreamConfig.getStreamOperatorFactory(StreamConfig.java:260)
... 6 more
Caused by: java.lang.reflect.InvocationTargetException
at sun.reflect.GeneratedMethodAccessor282.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.lang.invoke.SerializedLambda.readResolve(SerializedLambda.java:230)
at sun.reflect.GeneratedMethodAccessor281.invoke(Unknown Source)
at 
sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at 
java.io.ObjectStreamClass.invokeReadResolve(ObjectStreamClass.java:1274)
... 23 more
Caused by: java.lang.NoClassDefFoundError: Could not initialize 
com.my.organization.MyPerfectlyWorkingJob
... 31 more



Time needed to read from Kafka source

2021-05-25 Thread B.B.
Hi,

I am in the process of optimizing my job which at the moment by our
thinking is too slow.

We are deploying job in kubernetes with 1 job manager with 1gb ram and 1
cpu and 1 task manager with 4gb ram and 2 cpu-s (eg. 2 task slots and
parallelism of two).

The main problem is one kafka source that has 3,8 million events that we
have to process.
As a test we made a simple job that connects to kafka using a custom
implementation of KafkaDeserializationSchema. There we are using
ObjectMapper that mapps input values eg.

*var event = objectMapper.readValue(consumerRecord.value(), MyClass.class);*

This is then validated with hibernate validator and output of this
source is printed on the console.

The time needed for the job to consume all the events was one and a half
hours, which seems a bit long.
Is there a way we can speed up this process?

Is more cpu cores or memory solution?
Should we switch to avro deserialization schema?


[VOTE] Release 1.13.1, release candidate #1

2021-05-25 Thread Dawid Wysakowicz
|Hi everyone,|
|Please review and vote on the release candidate #1 for the version
1.13.1, as follows:|
|[ ] +1, Approve the release|
|[ ] -1, Do not approve the release (please provide specific comments)|
 
 
|The complete staging area is available for your review, which includes:|
|* JIRA release notes [1],|
|* the official Apache source release and binary convenience releases to
be deployed to dist.apache.org [2], which are signed with the key with
fingerprint ||31D2DD10BFC15A2D|| [3],|
|* all artifacts to be deployed to the Maven Central Repository [4],|
|* source code tag "release-1.13.1-rc1" [5],|
|* website pull request listing the new release and adding announcement
blog post [6]. |
 
|The vote will be open for at least 72 hours. It is adopted by majority
approval, with at least 3 PMC affirmative votes.|
 
|Best,|
|Dawid|
 
|[1]
https://issues.apache.org/jira/secure/ReleaseNote.jspa?projectId=12315522=12350058|
|[2] https://dist.apache.org/repos/dist/dev/flink/flink-1.13.1-rc1/|
|[3] https://dist.apache.org/repos/dist/release/flink/KEYS
|
|[4]
https://repository.apache.org/content/repositories/orgapacheflink-1422/|
|[5] https://github.com/apache/flink/tree/release-1.13.1-rc1|
|[6] https://github.com/apache/flink-web/pull/448|


OpenPGP_signature
Description: OpenPGP digital signature


Re: Flink 1.12.4 docker image

2021-05-25 Thread Arvid Heise
Hi Nikola,

I'm looking into it. I might have missed a step during release.

Best,

Arvid

On Mon, May 24, 2021 at 3:47 PM Nikola Hrusov  wrote:

> Hello,
>
> I saw that flink 1.12.4 just got released. However I am struggling to find
> the docker image.
>
> I checked both:
> - https://hub.docker.com/_/flink
> - https://hub.docker.com/r/apache/flink
>
> but on both 1.12.4 is not available.
>
> Are there plans to publish it as a docker image?
>
> Regards
> ,
> Nikola
>
>


How to read large amount of data from hive and write to redis, in a batch manner?

2021-05-25 Thread Yik San Chan
Hi community,

I have a Hive table that stores tens of millions rows of data. In my Flink
job, I want to process the data in batch manner:

- Split the data into batches, each batch has (maybe) 10,000 rows.
- For each batch, call a batchPut() API on my redis client to dump in Redis.

Doing so in a streaming manner is not expected, as that will cause too many
round trips between Flink workers and Redis.

Is there a way to do that? I find little clue in Flink docs, since almost
all APIs feel better suited for streaming processing by default.

Thank you!

Best,
Yik San


Re: Job recovery issues with state restoration

2021-05-25 Thread Peter Westermann
Hi Roman,

I am not able to consistently reproduce this issue. It seems to only occur when 
the failover happens at the wrong time. I have disabled task local recovery and 
will report back if we see this again. We need incremental checkpoints for our 
workload.
The SST files are not the ones for task local recovery, those would be in a 
different directory (we have configured io.tmp.dirs as /mnt/data/tmp).

Thanks,
Peter


From: Roman Khachatryan 
Date: Thursday, May 20, 2021 at 4:54 PM
To: Peter Westermann 
Cc: user@flink.apache.org 
Subject: Re: Job recovery issues with state restoration
Hi Peter,

Do you experience this issue if running without local recovery or
incremental checkpoints enabled?
Or have you maybe compared local (on TM) and  remove (on DFS) SST files?

Regards,
Roman

On Thu, May 20, 2021 at 5:54 PM Peter Westermann
 wrote:
>
> Hello,
>
>
>
> I’ve reported issues around checkpoint recovery in case of a job failure due 
> to zookeeper connection loss in the past. I am still seeing issues 
> occasionally.
>
> This is for Flink 1.12.3 with zookeeper for HA, S3 as the state backend, 
> incremental checkpoints, and task-local recovery enabled.
>
>
>
> Here’s what happened: A zookeeper instance was terminated as part of a 
> deployment for our zookeeper service, this caused a new jobmanager leader 
> election (so far so good). A leader was elected and the job was restarted 
> from the latest checkpoint but never became healthy. The root exception and 
> the logs show issues reading state:
>
> o.r.RocksDBException: Sst file size mismatch: 
> /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003579.sst.
>  Size recorded in manifest 36718, actual size 2570\
> Sst file size mismatch: 
> /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003573.sst.
>  Size recorded in manifest 13756, actual size 1307\
> Sst file size mismatch: 
> /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003575.sst.
>  Size recorded in manifest 16278, actual size 1138\
> Sst file size mismatch: 
> /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003576.sst.
>  Size recorded in manifest 23108, actual size 1267\
> Sst file size mismatch: 
> /mnt/data/tmp/flink-io-7139fea9-2dd8-42e6-8ffb-4d1a826f77d6/job_993eca72823b5ac13a377d7a844ac1b5_op_KeyedCoProcessOperator_d80b7e861bf73bdf93b8b27e5881807f__10_44__uuid_d3c2d251-c046-494a-bc25-57985a01fda1/db/003577.sst.
>  Size recorded in manifest 148089, actual size 1293\
> \
> \\tat org.rocksdb.RocksDB.open(RocksDB.java)\
> \\tat org.rocksdb.RocksDB.open(RocksDB.java:286)\
> \\tat o.a.f.c.s.s.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:80)\
> \\t... 22 common frames omitted\
> Wrapped by: java.io.IOException: Error while opening RocksDB instance.\
> \\tat o.a.f.c.s.s.RocksDBOperationUtils.openDB(RocksDBOperationUtils.java:92)\
> \\tat 
> o.a.f.c.s.s.r.AbstractRocksDBRestoreOperation.openDB(AbstractRocksDBRestoreOperation.java:145)\
> \\tat 
> o.a.f.c.s.s.r.RocksDBIncrementalRestoreOperation.restoreFromLocalState(RocksDBIncrementalRestoreOper...
>
>
>
> Since we retain multiple checkpoints, I tried redeploying the job from all 
> checkpoints that were still available. All those attempts lead to similar 
> failures. (I eventually had to use an older savepoint to recover the job.)
>
> Any guidance for avoiding this would be appreciated.
>
>
>
> Peter


Managing Jobs entirely with Flink Monitoring API

2021-05-25 Thread Barak Ben Nathan

I want to manage the execution of Flink Jobs programmatically through Flink 
Monitoring API.

I.e. I want to run/delete jobs ONLY with the
 POST /jars/:jarid/run
 POST /jobs/:jobid/stop
API commands.

Now, it seems that the Session Mode may fits my needs: “Session Mode: one 
JobManager instance manages multiple jobs sharing the same cluster of 
TaskManagers” 
(https://ci.apache.org/projects/flink/flink-docs-release-1.12/deployment/)
However, I couldn’t find a way to start the API server (i.e. a JobManager) that 
didn’t already include submitting a JAR file for a job execution.

Any suggestions?


Re: Manual creation of TypedValue instances in flink statefun

2021-05-25 Thread Igal Shilman
Hi Bonino,

I'm glad that the issue was resolved for you. Generally there are no
additional StateFun constraints in that regard,
we need to complete the shading process of Protobuf to avoid situations
such as these (issue tracked here: FLINK-22584).
Once that will be done, you can use that artifact from within the
datastream integration sdk.

Cheers,
Igal.

On Tue, May 25, 2021 at 12:15 PM Bonino Dario 
wrote:

> Dear Igal,
>
> in the meanwhile we already tried to remove the statefun-sdk-java as a
> dependency and actually the error is gone.
>
> Are there any guidelines to understand when the dependency is mandatory
> and how to solve conflicts as such in those cases?
>
> Best regards
>
> Dario
> On 5/25/21 12:04 PM, Igal Shilman wrote:
>
> Do you have the statefun-sdk-java as a dependency, if you are not using it
> can you remove it?
>
> On Tue, May 25, 2021 at 11:28 AM Bonino Dario 
> wrote:
>
>> Dear Igal,
>>
>> we are actually analyzing the issue. Our imported package is
>>
>> import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue
>>
>> however at runtime (we analyzed the method signature via reflection on
>> the typed value builder), the signature of the TypedValue.Builder.setValue
>> method is:
>>
>> org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder
>> org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder.setValue(org.apache.flink.statefun.sdk.shaded.com.google.protobuf.ByteString)
>>
>> and as you can notice, uses the shaded version of protobuf ByteString.
>> Instead we are attempting to pass a non-shaded version of ByteString as
>> a parameter and this leads to the reported error.
>>
>> Any idea on how to overcome this issue?
>>
>> Cheers,
>>
>> Dario
>> On 5/25/21 11:22 AM, Igal Shilman wrote:
>>
>> Hi Bonino,
>>
>> If you've also included the statefun-sdk-java into the class path then,
>> make sure that you are not using accidentally the shaded version of a
>> TypedValue (check the package name)
>>
>> Let me know if that helped, and we'll try to debug this together
>> otherwise.
>>
>> Cheers,
>> Igal.
>>
>> On Mon, May 24, 2021 at 6:46 PM Bonino Dario 
>> wrote:
>>
>>> Hello list,
>>>
>>> we are manually building TypedValue instances to be sent to a python
>>> remote function (with a reqreply function builder). We create the typed
>>> value as follows (in Kotlin):
>>>
>>> override fun map(value: Tuple2): TypedValue {
>>> return TypedValue.newBuilder()
>>> .setValue(getProtoValueByteString(value.f0, value.f1))
>>> .setTypename(typeName)
>>> .setHasValue(true)
>>> .build()
>>> }
>>>
>>> However,  when running  our job on a flink cluster (Flink 1.12.3,
>>> ververica platform) the job is terminated with this exception:
>>>
>>> java.lang.NoSuchMethodError: 
>>> org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder.setValue(Lcom/google/protobuf/ByteString;)Lorg/apache/flink/statefun/sdk/reqreply/generated/TypedValue$Builder
>>>
>>>
>>> although the setValue method is actually defined in the TypedValue
>>> class.
>>>
>>> We checked whether the issue could be related to some conflict on
>>> protobuf versions, but our classes are generated with protoc 3.7.1, which
>>> is aligned with what is reported in the flink-statefun (v3.0.0) parent pom.
>>> Any idea on what could cause the issue?
>>>
>>>
>>> Thanks and  best regards
>>>
>>> Dario Bonino
>>>
>>> --
>>> Ing. Dario Bonino, Ph.D
>>>
>>> e-m@il: dario.bon...@gmail.com
>>> www: https://www.linkedin.com/in/dariobonino
>>> 
>>> Dario
>>> Bonino
>>> slide...@hotmail.com
>>> 
>>>
>>> --
>> Ing. Dario Bonino, Ph.D
>>
>> e-m@il: dario.bon...@gmail.com
>> www: https://www.linkedin.com/in/dariobonino
>> 
>>  Dario
>>  Bonino
>>  slide...@hotmail.com
>> 
>>
>> --
> Ing. Dario Bonino, Ph.D
>
> e-m@il: dario.bon...@gmail.com
> www: https://www.linkedin.com/in/dariobonino
> 
>   Dario
>   Bonino
>   slide...@hotmail.com
> 
>
>


Re: Manual creation of TypedValue instances in flink statefun

2021-05-25 Thread Bonino Dario

Dear Igal,

in the meanwhile we already tried to remove the statefun-sdk-java as a 
dependency and actually the error is gone.


Are there any guidelines to understand when the dependency is mandatory 
and how to solve conflicts as such in those cases?


Best regards

Dario

On 5/25/21 12:04 PM, Igal Shilman wrote:
Do you have the statefun-sdk-java as a dependency, if you are not 
using it can you remove it?


On Tue, May 25, 2021 at 11:28 AM Bonino Dario > wrote:


Dear Igal,

we are actually analyzing the issue. Our imported package is

import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue

however at runtime (we analyzed the method signature via
reflection on the typed value builder), the signature of the
TypedValue.Builder.setValue method is:

org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder

org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder.setValue(org.apache.flink.statefun.sdk.shaded.com.google.protobuf.ByteString)

and as you can notice, uses the shaded version of protobuf
ByteString. Instead we are attempting to pass a non-shaded version
of ByteString as a parameter and this leads to the reported error.

Any idea on how to overcome this issue?

Cheers,

Dario

On 5/25/21 11:22 AM, Igal Shilman wrote:

Hi Bonino,

If you've also included the statefun-sdk-java into the class path
then, make sure that you are not using accidentally the shaded
version of a TypedValue (check the package name)

Let me know if that helped, and we'll try to debug this together
otherwise.

Cheers,
Igal.

On Mon, May 24, 2021 at 6:46 PM Bonino Dario
mailto:dario.bon...@gmail.com>> wrote:

Hello list,

we are manually building TypedValue instances to be sent to a
python remote function (with a reqreply function builder). We
create the typed value as follows (in Kotlin):

override fun map(value: Tuple2): TypedValue {
 return TypedValue.newBuilder()
 .setValue(getProtoValueByteString(value.f0, value.f1))
 .setTypename(typeName)
 .setHasValue(true)
 .build()
 }

However,  when running  our job on a flink cluster (Flink
1.12.3, ververica platform) the job is terminated with this
exception:

java.lang.NoSuchMethodError: 
org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder.setValue(Lcom/google/protobuf/ByteString;)Lorg/apache/flink/statefun/sdk/reqreply/generated/TypedValue$Builder

although the setValue method is actually defined in the
TypedValue class.

We checked whether the issue could be related to some
conflict on protobuf versions, but our classes are generated
with protoc 3.7.1, which is aligned with what is reported in
the flink-statefun (v3.0.0) parent pom. Any idea on what
could cause the issue?


Thanks and  best regards

Dario Bonino

-- 
Ing. Dario Bonino, Ph.D


e-m@il:dario.bon...@gmail.com    
www:https://www.linkedin.com/in/dariobonino  


Dario
Bonino
slide...@hotmail.com  



-- 
Ing. Dario Bonino, Ph.D


e-m@il:dario.bon...@gmail.com    
www:https://www.linkedin.com/in/dariobonino  


Dario
Bonino
slide...@hotmail.com  




--
Ing. Dario Bonino, Ph.D

e-m@il: dario.bon...@gmail.com
www: https://www.linkedin.com/in/dariobonino

Dario
Bonino
slide...@hotmail.com




Re: Manual creation of TypedValue instances in flink statefun

2021-05-25 Thread Igal Shilman
And also, if you are using reflection to debug this, can you also print the
full package name of the class TypedValue$Builder extends?
My guess is that you are using the shaded version of the builder.
The shaded version is part of the: statefun-java-sdk, if you can remove
that dependency I believe it should be fine.

Good luck,
Igal.

On Tue, May 25, 2021 at 12:04 PM Igal Shilman  wrote:

> Do you have the statefun-sdk-java as a dependency, if you are not using it
> can you remove it?
>
> On Tue, May 25, 2021 at 11:28 AM Bonino Dario 
> wrote:
>
>> Dear Igal,
>>
>> we are actually analyzing the issue. Our imported package is
>>
>> import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue
>>
>> however at runtime (we analyzed the method signature via reflection on
>> the typed value builder), the signature of the TypedValue.Builder.setValue
>> method is:
>>
>> org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder
>> org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder.setValue(org.apache.flink.statefun.sdk.shaded.com.google.protobuf.ByteString)
>>
>> and as you can notice, uses the shaded version of protobuf ByteString.
>> Instead we are attempting to pass a non-shaded version of ByteString as
>> a parameter and this leads to the reported error.
>>
>> Any idea on how to overcome this issue?
>>
>> Cheers,
>>
>> Dario
>> On 5/25/21 11:22 AM, Igal Shilman wrote:
>>
>> Hi Bonino,
>>
>> If you've also included the statefun-sdk-java into the class path then,
>> make sure that you are not using accidentally the shaded version of a
>> TypedValue (check the package name)
>>
>> Let me know if that helped, and we'll try to debug this together
>> otherwise.
>>
>> Cheers,
>> Igal.
>>
>> On Mon, May 24, 2021 at 6:46 PM Bonino Dario 
>> wrote:
>>
>>> Hello list,
>>>
>>> we are manually building TypedValue instances to be sent to a python
>>> remote function (with a reqreply function builder). We create the typed
>>> value as follows (in Kotlin):
>>>
>>> override fun map(value: Tuple2): TypedValue {
>>> return TypedValue.newBuilder()
>>> .setValue(getProtoValueByteString(value.f0, value.f1))
>>> .setTypename(typeName)
>>> .setHasValue(true)
>>> .build()
>>> }
>>>
>>> However,  when running  our job on a flink cluster (Flink 1.12.3,
>>> ververica platform) the job is terminated with this exception:
>>>
>>> java.lang.NoSuchMethodError: 
>>> org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder.setValue(Lcom/google/protobuf/ByteString;)Lorg/apache/flink/statefun/sdk/reqreply/generated/TypedValue$Builder
>>>
>>>
>>> although the setValue method is actually defined in the TypedValue
>>> class.
>>>
>>> We checked whether the issue could be related to some conflict on
>>> protobuf versions, but our classes are generated with protoc 3.7.1, which
>>> is aligned with what is reported in the flink-statefun (v3.0.0) parent pom.
>>> Any idea on what could cause the issue?
>>>
>>>
>>> Thanks and  best regards
>>>
>>> Dario Bonino
>>>
>>> --
>>> Ing. Dario Bonino, Ph.D
>>>
>>> e-m@il: dario.bon...@gmail.com
>>> www: https://www.linkedin.com/in/dariobonino
>>> 
>>> Dario
>>> Bonino
>>> slide...@hotmail.com
>>> 
>>>
>>> --
>> Ing. Dario Bonino, Ph.D
>>
>> e-m@il: dario.bon...@gmail.com
>> www: https://www.linkedin.com/in/dariobonino
>> 
>>  Dario
>>  Bonino
>>  slide...@hotmail.com
>> 
>>
>>


Re: Manual creation of TypedValue instances in flink statefun

2021-05-25 Thread Igal Shilman
Do you have the statefun-sdk-java as a dependency, if you are not using it
can you remove it?

On Tue, May 25, 2021 at 11:28 AM Bonino Dario 
wrote:

> Dear Igal,
>
> we are actually analyzing the issue. Our imported package is
>
> import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue
>
> however at runtime (we analyzed the method signature via reflection on the
> typed value builder), the signature of the TypedValue.Builder.setValue
> method is:
>
> org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder
> org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder.setValue(org.apache.flink.statefun.sdk.shaded.com.google.protobuf.ByteString)
>
> and as you can notice, uses the shaded version of protobuf ByteString.
> Instead we are attempting to pass a non-shaded version of ByteString as a
> parameter and this leads to the reported error.
>
> Any idea on how to overcome this issue?
>
> Cheers,
>
> Dario
> On 5/25/21 11:22 AM, Igal Shilman wrote:
>
> Hi Bonino,
>
> If you've also included the statefun-sdk-java into the class path then,
> make sure that you are not using accidentally the shaded version of a
> TypedValue (check the package name)
>
> Let me know if that helped, and we'll try to debug this together otherwise.
>
> Cheers,
> Igal.
>
> On Mon, May 24, 2021 at 6:46 PM Bonino Dario 
> wrote:
>
>> Hello list,
>>
>> we are manually building TypedValue instances to be sent to a python
>> remote function (with a reqreply function builder). We create the typed
>> value as follows (in Kotlin):
>>
>> override fun map(value: Tuple2): TypedValue {
>> return TypedValue.newBuilder()
>> .setValue(getProtoValueByteString(value.f0, value.f1))
>> .setTypename(typeName)
>> .setHasValue(true)
>> .build()
>> }
>>
>> However,  when running  our job on a flink cluster (Flink 1.12.3,
>> ververica platform) the job is terminated with this exception:
>>
>> java.lang.NoSuchMethodError: 
>> org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder.setValue(Lcom/google/protobuf/ByteString;)Lorg/apache/flink/statefun/sdk/reqreply/generated/TypedValue$Builder
>>
>>
>> although the setValue method is actually defined in the TypedValue class.
>>
>> We checked whether the issue could be related to some conflict on
>> protobuf versions, but our classes are generated with protoc 3.7.1, which
>> is aligned with what is reported in the flink-statefun (v3.0.0) parent pom.
>> Any idea on what could cause the issue?
>>
>>
>> Thanks and  best regards
>>
>> Dario Bonino
>>
>> --
>> Ing. Dario Bonino, Ph.D
>>
>> e-m@il: dario.bon...@gmail.com
>> www: https://www.linkedin.com/in/dariobonino
>> 
>>  Dario
>>  Bonino
>>  slide...@hotmail.com
>> 
>>
>> --
> Ing. Dario Bonino, Ph.D
>
> e-m@il: dario.bon...@gmail.com
> www: https://www.linkedin.com/in/dariobonino
> 
>   Dario
>   Bonino
>   slide...@hotmail.com
> 
>
>


Re: Manual creation of TypedValue instances in flink statefun

2021-05-25 Thread Bonino Dario

Dear Igal,

we are actually analyzing the issue. Our imported package is

import org.apache.flink.statefun.sdk.reqreply.generated.TypedValue

however at runtime (we analyzed the method signature via reflection on 
the typed value builder), the signature of the 
TypedValue.Builder.setValue method is:


org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder 
org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder.setValue(org.apache.flink.statefun.sdk.shaded.com.google.protobuf.ByteString)


and as you can notice, uses the shaded version of protobuf ByteString. 
Instead we are attempting to pass a non-shaded version of ByteString as 
a parameter and this leads to the reported error.


Any idea on how to overcome this issue?

Cheers,

Dario

On 5/25/21 11:22 AM, Igal Shilman wrote:

Hi Bonino,

If you've also included the statefun-sdk-java into the class path 
then, make sure that you are not using accidentally the shaded version 
of a TypedValue (check the package name)


Let me know if that helped, and we'll try to debug this together 
otherwise.


Cheers,
Igal.

On Mon, May 24, 2021 at 6:46 PM Bonino Dario > wrote:


Hello list,

we are manually building TypedValue instances to be sent to a
python remote function (with a reqreply function builder). We
create the typed value as follows (in Kotlin):

override fun map(value: Tuple2): TypedValue {
 return TypedValue.newBuilder()
 .setValue(getProtoValueByteString(value.f0, value.f1))
 .setTypename(typeName)
 .setHasValue(true)
 .build()
 }

However,  when running  our job on a flink cluster (Flink 1.12.3,
ververica platform) the job is terminated with this exception:

java.lang.NoSuchMethodError: 
org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder.setValue(Lcom/google/protobuf/ByteString;)Lorg/apache/flink/statefun/sdk/reqreply/generated/TypedValue$Builder

although the setValue method is actually defined in the TypedValue
class.

We checked whether the issue could be related to some conflict on
protobuf versions, but our classes are generated with protoc
3.7.1, which is aligned with what is reported in the
flink-statefun (v3.0.0) parent pom. Any idea on what could cause
the issue?


Thanks and  best regards

Dario Bonino

-- 
Ing. Dario Bonino, Ph.D


e-m@il:dario.bon...@gmail.com    
www:https://www.linkedin.com/in/dariobonino  


Dario
Bonino
slide...@hotmail.com  




--
Ing. Dario Bonino, Ph.D

e-m@il: dario.bon...@gmail.com
www: https://www.linkedin.com/in/dariobonino

Dario
Bonino
slide...@hotmail.com




Re: Manual creation of TypedValue instances in flink statefun

2021-05-25 Thread Igal Shilman
Hi Bonino,

If you've also included the statefun-sdk-java into the class path then,
make sure that you are not using accidentally the shaded version of a
TypedValue (check the package name)

Let me know if that helped, and we'll try to debug this together otherwise.

Cheers,
Igal.

On Mon, May 24, 2021 at 6:46 PM Bonino Dario  wrote:

> Hello list,
>
> we are manually building TypedValue instances to be sent to a python
> remote function (with a reqreply function builder). We create the typed
> value as follows (in Kotlin):
>
> override fun map(value: Tuple2): TypedValue {
> return TypedValue.newBuilder()
> .setValue(getProtoValueByteString(value.f0, value.f1))
> .setTypename(typeName)
> .setHasValue(true)
> .build()
> }
>
> However,  when running  our job on a flink cluster (Flink 1.12.3,
> ververica platform) the job is terminated with this exception:
>
> java.lang.NoSuchMethodError: 
> org.apache.flink.statefun.sdk.reqreply.generated.TypedValue$Builder.setValue(Lcom/google/protobuf/ByteString;)Lorg/apache/flink/statefun/sdk/reqreply/generated/TypedValue$Builder
>
>
> although the setValue method is actually defined in the TypedValue class.
>
> We checked whether the issue could be related to some conflict on protobuf
> versions, but our classes are generated with protoc 3.7.1, which is aligned
> with what is reported in the flink-statefun (v3.0.0) parent pom. Any idea
> on what could cause the issue?
>
>
> Thanks and  best regards
>
> Dario Bonino
>
> --
> Ing. Dario Bonino, Ph.D
>
> e-m@il: dario.bon...@gmail.com
> www: https://www.linkedin.com/in/dariobonino
> 
>   Dario
>   Bonino
>   slide...@hotmail.com
> 
>
>


Re: KafkaSource metrics

2021-05-25 Thread Qingsheng Ren
Hi Oscar,

Thanks for raising this problem! Currently metrics of KafkaConsumer are not 
registered in Flink as in FlinkKafkaConsumer. A ticket has been created on 
JIRA, and hopefully we can fix it in next release.

https://issues.apache.org/jira/browse/FLINK-22766

--
Best Regards,

Qingsheng Ren
Email: renqs...@gmail.com

On May 25, 2021, 2:35 PM +0800, 陳樺威 , wrote:
> Hi Ardhani,
>
> Thanks for your kindly reply.
>
> Our team use your provided metrics before, but the metrics disappear after 
> migrate to new KafkaSource.
>
> We initialize KafkaSource in following code.
> ```
val consumer: KafkaSource[T] = KafkaSource.builder()
 .setProperties(properties)
 .setTopics(topic)
 .setValueOnlyDeserializer(deserializer)
 
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
 .build()

env
 .fromSource(consumer, WatermarkStrategy.noWatermarks(), uid)
 .setParallelism(math.min(parallelism, env.getParallelism))
 .setMaxParallelism(parallelism)
 .name(uid).uid(uid)
 .rebalance
> ```
>
> Oscar
>
Ardhani Narasimha  於 2021年5月25日 週二 上午12:08寫道:
> Use below respectively
>
> flink_taskmanager_job_task_operator_KafkaConsumer_bytes_consumed_rate - 
> Consumer rate
> flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max - Consumer 
> lag
> flink_taskmanager_job_task_operator_KafkaConsumer_commit_latency_max - commit 
> latency
>
> unsure if reactive mode makes any difference.
> > On Mon, May 24, 2021 at 7:44 PM 陳樺威  wrote:
> > > Hello,
> > >
> > > Our team tries to test reactive mode and replace FlinkKafkaConsumer with 
> > > the new KafkaSource.
> > > But we can’t find the KafkaSource metrics list. Does anyone have any 
> > > idea? In our case, we want to know the Kafka consume delay and consume 
> > > rate.
> > >
> > > Thanks,
> > > Oscar
>
> ---
> IMPORTANT: The contents of this email and any attachments are confidential 
> and protected by applicable laws. If you have received this email by mistake, 
> please (i) notify the sender immediately; (ii) delete it from your database; 
> and (iii) do not disclose the contents to anyone or make copies thereof. 
> Razorpay accepts no liability caused due to any inadvertent/ unintentional 
> data transmitted through this email.
> ---


Re: Jobmanager Crashes with Kubernetes HA When Restart Kubernetes Master Node

2021-05-25 Thread Yang Wang
By "restart master node", do you mean to restart the K8s master
component(e.g. APIServer, ETCD, etc.)?

Even though the master components are restarted, the Flink JobManager and
TaskManager should eventually get to work.
Could you please share the JobManager logs so that we could debug why it
crashed.


Best,
Yang

Jerome Li  于2021年5月25日周二 上午3:43写道:

> Hi,
>
>
>
> I am running Flink v1.12.2 in Standalone mode on Kubernetes. I set
> Kubernetes native as HA.
>
>
>
> The HA works well when either jobmanager or taskmanager pod lost or
> crashes.
>
>
>
> But, when I restart master node, jobmanager pod will always crash and
> restart. This results in the entire Flink cluster restart and most of
> taskmanager pod will restart as well.
>
>
>
> I didn’t see this issue when using zookeeper as HA. Not sure if this is a
> bug should be handle or there is some work around.
>
>
>
>
>
> Below is my Flink setting
>
> Job-Manager
>
> flink-conf.yaml:
>
> 
>
> jobmanager.rpc.address: streakerflink-jobmanager
>
>
>
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>
> high-availability.cluster-id: /streaker
>
> high-availability.jobmanager.port: 6123
>
> high-availability.storageDir:
> hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink
>
> kubernetes.cluster-id: streaker
>
>
>
> rest.address: streakerflink-jobmanager
>
> rest.bind-port: 8081
>
> rest.port: 8081
>
>
>
> state.checkpoints.dir:
> hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink/streaker
>
>
>
> blob.server.port: 6124
>
> metrics.internal.query-service.port: 6125
>
> metrics.reporters: prom
>
> metrics.reporter.prom.class:
> org.apache.flink.metrics.prometheus.PrometheusReporter
>
> metrics.reporter.prom.port: 
>
>
>
> restart-strategy: fixed-delay
>
> restart-strategy.fixed-delay.attempts: 2147483647
>
> restart-strategy.fixed-delay.delay: 5 s
>
>
>
> jobmanager.memory.process.size: 1768m
>
>
>
> parallelism.default: 1
>
>
>
> task.cancellation.timeout: 2000
>
>
>
> web.log.path: /opt/flink/log/output.log
>
> jobmanager.web.log.path: /opt/flink/log/output.log
>
>
>
> web.submit.enable: false
>
>
>
> Task-Manager
>
> flink-conf.yaml:
>
> 
>
> jobmanager.rpc.address: streakerflink-jobmanager
>
>
>
> high-availability:
> org.apache.flink.kubernetes.highavailability.KubernetesHaServicesFactory
>
> high-availability.cluster-id: /streaker
>
> high-availability.storageDir:
> hdfs://hdfs-namenode-0.hdfs-namenode:8020/flink
>
> kubernetes.cluster-id: streaker
>
>
>
> taskmanager.network.bind-policy: ip
>
>
>
> taskmanager.data.port: 6121
>
> taskmanager.rpc.port: 6122
>
>
>
> restart-strategy: fixed-delay
>
> restart-strategy.fixed-delay.attempts: 2147483647
>
> restart-strategy.fixed-delay.delay: 5 s
>
>
>
> taskmanager.memory.task.heap.size: 9728m
>
> taskmanager.memory.framework.off-heap.size: 512m
>
> taskmanager.memory.managed.size: 512m
>
> taskmanager.memory.jvm-metaspace.size: 256m
>
> taskmanager.memory.jvm-overhead.max: 3g
>
> taskmanager.memory.jvm-overhead.fraction: 0.035
>
> taskmanager.memory.network.fraction: 0.03
>
> taskmanager.memory.network.max: 3g
>
> taskmanager.numberOfTaskSlots: 1
>
>
>
> taskmanager.jvm-exit-on-oom: true
>
>
>
> metrics.internal.query-service.port: 6125
>
> metrics.reporters: prom
>
> metrics.reporter.prom.class:
> org.apache.flink.metrics.prometheus.PrometheusReporter
>
> metrics.reporter.prom.port: 
>
>
>
> web.log.path: /opt/flink/log/output.log
>
> taskmanager.log.path: /opt/flink/log/output.log
>
>
>
> task.cancellation.timeout: 2000
>
>
>
> Any help will be appreciated!
>
>
>
> Thanks,
>
> Jerome
>


Re: yarn ship from s3

2021-05-25 Thread Piotr Nowojski
Hi Vijay,

I'm not sure if I understand your question correctly. You have jar and
configs (1, 2, 3 and 4) on S3 and you want to start a Flink job using
those? Can you simply download those things (whole directory containing
those) to the machine that will be starting the Flink job?

Best, Piotrek

wt., 25 maj 2021 o 07:50 Vijayendra Yadav 
napisał(a):

> Hi Team,
>
> I am trying to find a way to ship files from aws s3 for a flink streaming
> job, I am running on AWS EMR. What i need to ship are following:
> 1) application jar
> 2) application property file
> 3) custom flink-conf.yaml
> 4) log4j application specific
>
> Please let me know options.
>
> Thanks,
> Vijay
>


Re: DataStream API in Batch Mode job is timing out, please advise on how to adjust the parameters.

2021-05-25 Thread Piotr Nowojski
Hi Marco,

How are you starting the job? For example, are you using Yarn as the
resource manager? It looks like there is just enough resources in the
cluster to run this job. Assuming the cluster is correctly configured and
Task Managers are able to connect with the Job Manager (can you share full
JM/TM logs?), I would say your job is simply too large (32 parallelism?)
for the given configuration.

Best,
Piotrek

wt., 25 maj 2021 o 06:10 Marco Villalobos 
napisał(a):

> I am running with one job manager and three task managers.
>
> Each task manager is receiving at most 8 gb of data, but the job is timing
> out.
>
> What parameters must I adjust?
>
> Sink: back fill db sink) (15/32) (50626268d1f0d4c0833c5fa548863abd)
> switched from SCHEDULED to FAILED on [unassigned resource].
> java.util.concurrent.CompletionException:
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException:
> Slot request bulk is not fulfillable! Could not allocate the required slot
> within slot request timeout
> at
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
> ~[?:1.8.0_282]
> at
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
> ~[?:1.8.0_282]
> at
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607)
> ~[?:1.8.0_282]
> at
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
> ~[?:1.8.0_282]
> at
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
> ~[?:1.8.0_282]
> at
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
> ~[?:1.8.0_282]
> at
> org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223)
> ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at
> org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168)
> ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at
> org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
> ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
> ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
> ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
> ~[?:1.8.0_282]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266)
> ~[?:1.8.0_282]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
> ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
> ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
> ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
> ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123)
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21)
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.actor.Actor.aroundReceive(Actor.scala:517)
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515)
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225)
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592)
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561)
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258)
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at 

Re: DataStream API in Batch Mode job is timing out, please advise on how to adjust the parameters.

2021-05-25 Thread Yangze Guo
Hi, Marco,

The root cause is NoResourceAvailableException. Could you provide the
following information?
- How many slots each TM has?
- Your job's topology, it would also be good to share the job manager log.

Best,
Yangze Guo

On Tue, May 25, 2021 at 12:10 PM Marco Villalobos
 wrote:
>
> I am running with one job manager and three task managers.
>
> Each task manager is receiving at most 8 gb of data, but the job is timing 
> out.
>
> What parameters must I adjust?
>
> Sink: back fill db sink) (15/32) (50626268d1f0d4c0833c5fa548863abd) switched 
> from SCHEDULED to FAILED on [unassigned resource].
> java.util.concurrent.CompletionException: 
> org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException: 
> Slot request bulk is not fulfillable! Could not allocate the required slot 
> within slot request timeout
> at 
> java.util.concurrent.CompletableFuture.encodeThrowable(CompletableFuture.java:292)
>  ~[?:1.8.0_282]
> at 
> java.util.concurrent.CompletableFuture.completeThrowable(CompletableFuture.java:308)
>  ~[?:1.8.0_282]
> at 
> java.util.concurrent.CompletableFuture.uniApply(CompletableFuture.java:607) 
> ~[?:1.8.0_282]
> at 
> java.util.concurrent.CompletableFuture$UniApply.tryFire(CompletableFuture.java:591)
>  ~[?:1.8.0_282]
> at 
> java.util.concurrent.CompletableFuture.postComplete(CompletableFuture.java:488)
>  ~[?:1.8.0_282]
> at 
> java.util.concurrent.CompletableFuture.completeExceptionally(CompletableFuture.java:1990)
>  ~[?:1.8.0_282]
> at 
> org.apache.flink.runtime.scheduler.SharedSlot.cancelLogicalSlotRequest(SharedSlot.java:223)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at 
> org.apache.flink.runtime.scheduler.SlotSharingExecutionSlotAllocator.cancelLogicalSlotRequest(SlotSharingExecutionSlotAllocator.java:168)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at 
> org.apache.flink.runtime.scheduler.SharingPhysicalSlotRequestBulk.cancel(SharingPhysicalSlotRequestBulk.java:86)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkWithTimestamp.cancel(PhysicalSlotRequestBulkWithTimestamp.java:66)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at 
> org.apache.flink.runtime.jobmaster.slotpool.PhysicalSlotRequestBulkCheckerImpl.lambda$schedulePendingRequestBulkWithTimestampCheck$0(PhysicalSlotRequestBulkCheckerImpl.java:91)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) 
> ~[?:1.8.0_282]
> at java.util.concurrent.FutureTask.run(FutureTask.java:266) ~[?:1.8.0_282]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRunAsync(AkkaRpcActor.java:442)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:209)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at 
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:159)
>  ~[feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at scala.PartialFunction.applyOrElse(PartialFunction.scala:123) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at scala.PartialFunction.applyOrElse$(PartialFunction.scala:122) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:21) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.actor.Actor.aroundReceive(Actor.scala:517) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.actor.Actor.aroundReceive$(Actor.scala:515) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:225) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:592) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.actor.ActorCell.invoke(ActorCell.scala:561) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:258) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.dispatch.Mailbox.run(Mailbox.scala:225) 
> [feature-LUM-3882-toledo--850a6747.jar:?]
> at akka.dispatch.Mailbox.exec(Mailbox.scala:235) 
> 

Re: When to prefer toDataStream over toAppendStream or toRetractStream?

2021-05-25 Thread Timo Walther

Hi Yik San,

`toDataStream` and `toChangelogStream` are the new API's for a smooth 
integration of Table API and DataStream API. You can find the full 
documentation here:


https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/dev/table/data_stream_api/

Since `toDataStream` and `toChangelogStream` are relatively new and the 
latter is marked as @Experimental in 1.13. The old methods are still 
kept around for some time. There are also some known issues like 
FLINK-22666 and FLINK-22378 that will be fixed in 1.13.1.


`toAppendStream` or `toRetractStream` will be deprecated in 1.14.

So for now both methods are kind of equally important to get the job 
done. But `toDataStream` and `toChangelogStream` are the stable 
long-term solution in 1.13.1+.


Regards,
Timo


On 24.05.21 14:02, Yik San Chan wrote:

Hi community,

Flink 1.13 introduces toDataStream. However, I wonder when do we prefer 
toDataStream over toAppendStream or toRetractStream?


Thank you!

Best,
Yik San




[Flink-Siddhi]: Issue processing siddhi CEP task on flink

2021-05-25 Thread Dipanjan Mazumder
 
Hi Guys,
     I am facing another integration challenge with flink-siddhi which i am not 
able to crack yet , i am trying to debug the flow with flink but its too much 
and taking a lot of time , being new to flink. My problem is basically when i 
publish message through kafka and the flink aplication/job has CEP processing 
operation using SiddiCEP , taskmanager is not processing the task with the 
operator having siddhi CEP. It is not even throwing any error anywhere. So i 
started debugging the taskmanager but i am not able to reach anywhere yet have 
been doing that from last 4 days with no luck. I have created a stackoverflow 
query which has more details on the program and application. 
Flink version : 1.13Scala: 2.11Java: 1.8I am using flink-siddhi library: 
haoch/flink-siddhi 2.11-0.2.2-snapshot version

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
haoch/flink-siddhi

A CEP library to run Siddhi within Apache Flink™ Streaming Application - 
haoch/flink-siddhi
 |

 |

 |




The link is :    Not able to process kafka json message with Flink siddhi 
library

| 
| 
| 
|  |  |

 |

 |
| 
|  | 
Not able to process kafka json message with Flink siddhi library

I am trying to create a simple application where the app will consume Kafka 
message do some cql transform and pu...
 |

 |

 |





If there is any pointer that can be given on the problem , it would save my day 
and life. So please kindly respond on the same.
Thanks in advance.
RegardsDipanjanOn Tuesday, May 25, 2021, 12:48:59 PM GMT+5:30, Dipanjan 
Mazumder  wrote:  
 
  Hi All,
    Found the solution , 
Problem: I was actually using an intermediate library to integrate siddhi with 
Flink (https://github.com/haoch/flink-siddhi) and i was creating a SiddhiCEP 
instance and then calling "define()" on that instance , while i was registering 
the extension on the created instance , but the define method was creating an 
internal SiddhiCEP instance and using that for processing. So i found that out 
by debugging the application. It is an implementation problem with the library 
itself.
Solution: i used from() method on the created SiddhiCEP instance instead of 
define() and it was using the CEP i created for doing the rest of processing 
and the registered extension were accounted similarly and so they were 
recognised at runtime.
RegardsDipanjan
On Friday, May 21, 2021, 01:51:09 PM GMT+5:30, Salva Alcántara 
 wrote:  
 
 Hi Dipanjan,

I agree with Till. If the extensions are are included in the jar for your
job, it should work. I was having the same doubts some weeks a go and can
confirm that as long as the jar includes those extensions, it works.

One thing I needed to do is to register the different extensions. For
example:

```
siddhiManager.setExtension("map:create", classOf[CreateFunctionExtension])
```

Regards,

Salva



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


Re: Issue with using siddhi extension function with flink

2021-05-25 Thread Dipanjan Mazumder
 Hi All,
    Found the solution , 
Problem: I was actually using an intermediate library to integrate siddhi with 
Flink (https://github.com/haoch/flink-siddhi) and i was creating a SiddhiCEP 
instance and then calling "define()" on that instance , while i was registering 
the extension on the created instance , but the define method was creating an 
internal SiddhiCEP instance and using that for processing. So i found that out 
by debugging the application. It is an implementation problem with the library 
itself.
Solution: i used from() method on the created SiddhiCEP instance instead of 
define() and it was using the CEP i created for doing the rest of processing 
and the registered extension were accounted similarly and so they were 
recognised at runtime.
RegardsDipanjan
On Friday, May 21, 2021, 01:51:09 PM GMT+5:30, Salva Alcántara 
 wrote:  
 
 Hi Dipanjan,

I agree with Till. If the extensions are are included in the jar for your
job, it should work. I was having the same doubts some weeks a go and can
confirm that as long as the jar includes those extensions, it works.

One thing I needed to do is to register the different extensions. For
example:

```
siddhiManager.setExtension("map:create", classOf[CreateFunctionExtension])
```

Regards,

Salva



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/
  

Re: KafkaSource metrics

2021-05-25 Thread 陳樺威
Hi Ardhani,

Thanks for your kindly reply.

Our team use your provided metrics before, but the metrics disappear after
migrate to new KafkaSource.

We initialize KafkaSource in following code.
```

val consumer: KafkaSource[T] = KafkaSource.builder()
  .setProperties(properties)
  .setTopics(topic)
  .setValueOnlyDeserializer(deserializer)
  
.setStartingOffsets(OffsetsInitializer.committedOffsets(OffsetResetStrategy.LATEST))
  .build()

env
  .fromSource(consumer, WatermarkStrategy.noWatermarks(), uid)
  .setParallelism(math.min(parallelism, env.getParallelism))
  .setMaxParallelism(parallelism)
  .name(uid).uid(uid)
  .rebalance

```

Oscar

Ardhani Narasimha  於 2021年5月25日 週二
上午12:08寫道:

> Use below respectively
>
> flink_taskmanager_job_task_operator_KafkaConsumer_bytes_consumed_rate -
> Consumer rate
> flink_taskmanager_job_task_operator_KafkaConsumer_records_lag_max -
> Consumer lag
> flink_taskmanager_job_task_operator_KafkaConsumer_commit_latency_max -
> commit latency
>
> unsure if reactive mode makes any difference.
> On Mon, May 24, 2021 at 7:44 PM 陳樺威  wrote:
>
>> Hello,
>>
>> Our team tries to test reactive mode and replace FlinkKafkaConsumer with
>> the new KafkaSource.
>> But we can’t find the KafkaSource metrics list. Does anyone have any
>> idea? In our case, we want to know the Kafka consume delay and consume rate.
>>
>> Thanks,
>> Oscar
>>
>
>
> ---
> *IMPORTANT*: The contents of this email and any attachments are
> confidential and protected by applicable laws. If you have received this
> email by mistake, please (i) notify the sender immediately; (ii) delete it
> from your database; and (iii) do not disclose the contents to anyone or
> make copies thereof. Razorpay accepts no liability caused due to any
> inadvertent/ unintentional data transmitted through this email.
>
> ---
>


Re: Issue on creating and using a custom connector in Ververica

2021-05-25 Thread Ingo Bürk
Hi Natu,

the message is of course a bit unfortunate and misleading, but the issue
here isn't that multiple connectors are found, but that none are found. The
repository you linked to implements a connector using the old connector
stack, but Ververica Platform only supports the new stack, see [1].
>From a quick look it also seems to be missing the entry in
META-INF/services which would be required for the factory discovery to work.

In my personal GitHub profile I have an example[2] for a connector using
the new stack which also works in Ververica Platform (it's not for
Elasticsearch, though).

[1]
https://docs.ververica.com/user_guide/sql_development/connectors.html#developing-a-custom-connector-or-format
[2] https://github.com/Airblader/flink-connector-imap


Regards
Ingo

On Mon, May 24, 2021 at 1:57 PM Natu Lauchande  wrote:

> Good day Flink community,
>
> Apache Flink/Ververica Community Edition - Question
>
> I am trying to add a custom connector to Ververica community edition and
> keeps giving me the following error: "The jar contains multiple connector.
> Please choose one.", it doesn't allow me to choose more jars. I am testing
> with the following repo generated custom connectors:
> https://github.com/deadwind4/slink/tree/master/connector-es6
>
> My specific question is there anything specific missing from this repos
> that we should add to signal ververica about a custom record
>
>
> I am constantly receiving the following message attached ("The jar
> contains multiple connectors, please select one.") and i can't select any
> option.
>
>
> Thanks,
>
> Natu
>