Jobmanager stopped because uncaught exception

2021-02-07 Thread Lei Wang
Flink standalone HA. Flink version 1.12.1 2021-02-08 13:57:50,550 ERROR org.apache.flink.runtime.util.FatalExitExceptionHandler [] - FATAL: Thread 'cluster-io-thread-30' produced an uncaught exception. Stopping the process... java.util.concurrent.RejectedExecutionException: Task java.util.c

Re: Jobmanager stopped because uncaught exception

2021-02-08 Thread Lei Wang
ady been resolved in 1.12.2(will > release soon). > BTW, I think it is unrelated with the aliyun oss info logs. > > [1]. https://issues.apache.org/jira/browse/FLINK-20992 > > > Best, > Yang > > Lei Wang 于2021年2月8日周一 下午2:22写道: > >> Flink standalone HA. Fli

How to do code isolation if muiltple jobs run on the same taskmanager process?

2021-03-10 Thread Lei Wang
Consider the following situation. Two jobs builed in the same jar, they will share some common code, for example, some static constants variables. Currently they are running on the same task manager process. I killed job1, changed the static variable to another and then submit it again. Does ano

Re: How to do code isolation if muiltple jobs run on the same taskmanager process?

2021-03-11 Thread Lei Wang
ltiple times (1 per job). So there should be no interference. You > could verify it by logging the value of the constant and see it yourself. > > Best, > > Arvid > > On Thu, Mar 11, 2021 at 7:11 AM Lei Wang wrote: > >> Consider the following situation. >> >>

Task is always created state after submit a example job

2021-06-17 Thread Lei Wang
flink 1.11.2 on a single host. ./bin/start-cluster.sh and then ./bin/flink run examples/streaming/SocketWindowWordCount.jar --hostname localhost --port But on the jobmanager UI, the task is always in created state. There's available slots. Any insights on this? Thanks, Lei

Re: Task is always created state after submit a example job

2021-06-20 Thread Lei Wang
the Job Manager and Task Manager logs. Take a > look if Task Managers connected/registered in the Job Manager and if so, if > there were no problems when submitting the job. It seems like either there > are not enough slots, or slots are actually not available. > > Best, > Piotrek

Checkpoint size increasing even i enable increasemental checkpoint

2021-10-11 Thread Lei Wang
[image: image.png] The checkpointed data size became bigger and bigger and the node cpu is very high when the job is doing checkpointing. But I have enabled incremental checkpointing: env.setStateBackend(new RocksDBStateBackend(checkpointDir, true)); I am using flink-1.11.2 and aliyun oss as ch

Kafka order info to MySQL discard middle status and guarantee final correctness

2022-02-27 Thread Lei Wang
Receive order message from kafka, every message has a status field, the schema is just like this: orderId,status The message will be inserted to MySQL. For each order there's too much status and changes very frequently. In order to reduce stress to the database, we can discard some middle s

TaskMgr Metaspace become bigger and bigger after submitting new jobs

2022-04-24 Thread Lei Wang
I start a standalone session on a single server with only one taskMgr. The JVM metaspace will become bigger after submitting a new job. Even if I cancel the submitted job, the JVM metaspace will not decrease. After submitting about 15 times, the task manager was shut down because of OOM 2022-04-2

Optimize exact deduplication for tens of billions data per day

2024-03-28 Thread Lei Wang
Use RocksDBBackend to store whether the element appeared within the last one day, here is the code: *public class DedupFunction extends KeyedProcessFunction {* *private ValueState isExist;* *public void open(Configuration parameters) throws Exception {* *ValueStateDescriptor de

Re: Optimize exact deduplication for tens of billions data per day

2024-03-29 Thread Lei Wang
hope this helps, > Peter > > On Fri, Mar 29, 2024, 09:08 Lei Wang wrote: > >> >> Use RocksDBBackend to store whether the element appeared within the last >> one day, here is the code: >> >> *public class DedupFunction extends KeyedProcessFunction {*

How to enable RocksDB native metrics?

2024-04-06 Thread Lei Wang
Using big state and want to do some performance tuning, how can i enable RocksDB native metrics? I am using Flink 1.14.4 Thanks, Lei

Re: How to enable RocksDB native metrics?

2024-04-07 Thread Lei Wang
; https://nightlies.apache.org/flink/flink-docs-release-1.14/docs/deployment/config/#rocksdb-native-metrics >> (RocksDB Native Metrics) >> >> >> Best, >> Zakelly >> >> On Sun, Apr 7, 2024 at 4:47 PM Lei Wang wrote: >> >>> >>> U

Re: How to enable RocksDB native metrics?

2024-04-07 Thread Lei Wang
he.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> > > Sent from my iPhone > > On Apr 7, 2024, at 6:03 PM, Lei Wang wrote: > >  > I want to enable it only for specified jobs, how can I specify the > configurations on cmd line when sub

Re: How to enable RocksDB native metrics?

2024-04-10 Thread Lei Wang
cs> >> [image: favicon.png] >> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> >> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> >> >>

Re: How to enable RocksDB native metrics?

2024-04-10 Thread Lei Wang
ig/#rocksdb-native-metrics> >> <https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/deployment/config/#rocksdb-native-metrics> >> >> Sent from my iPhone >> >> On Apr 7, 2024, at 6:03 PM, Lei Wang wrote: >> >>  >> I want to en

Re: Optimize exact deduplication for tens of billions data per day

2024-04-10 Thread Lei Wang
Hi Peter, I tried,this improved performance significantly,but i don't know exactly why. According to what i know, the number of keys in RocksDB doesn't decrease. Any specific technical material about this? Thanks, Lei On Fri, Mar 29, 2024 at 9:49 PM Lei Wang wrote: > Perhaps

Re: How to enable RocksDB native metrics?

2024-04-11 Thread Lei Wang
old-styled CLI for YARN jobs where "-yD" instead of "-D" > should be used. > ------ > *From:* Lei Wang > *Sent:* Thursday, April 11, 2024 12:39 > *To:* Biao Geng > *Cc:* user > *Subject:* Re: How to enable RocksDB native metrics?

Re: How to enable RocksDB native metrics?

2024-04-11 Thread Lei Wang
Thanks very much, it finally works On Thu, Apr 11, 2024 at 8:27 PM Zhanghao Chen wrote: > Add a space between -yD and the param should do the trick. > > Best, > Zhanghao Chen > ------ > *From:* Lei Wang > *Sent:* Thursday, April 11, 2024 19:40

Why RocksDB metrics cache-usage is larger than cache-capacity

2024-04-11 Thread Lei Wang
I enable RocksDB native metrics and do some performance tuning. state.backend.rocksdb.block.cache-size is set to 128m,4 slots for each TaskManager. The observed result for one specific parallel slot: state.backend.rocksdb.metrics.block-cache-capacity is about 14.5M state.backend.rocksdb.metric

Re: Why RocksDB metrics cache-usage is larger than cache-capacity

2024-04-23 Thread Lei Wang
u share the related rocksdb log which > may contain more detailed info ? > > On Fri, Apr 12, 2024 at 12:49 PM Lei Wang wrote: > >> >> I enable RocksDB native metrics and do some performance tuning. >> >> state.backend.rocksdb.block.cache-size is set to 128m,4 sl

Force to commit kafka offset when stop a job.

2024-06-05 Thread Lei Wang
When stopping a flink job that consuming kafka message, how to force it to commit kafka offset Thanks, Lei

Re: Force to commit kafka offset when stop a job.

2024-06-06 Thread Lei Wang
avepoint [1]. Flink which will > trigger a final offset commit on the final savepoint. > > [1] > https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/cli/#stopping-a-job-gracefully-creating-a-final-savepoint > > Best, > Zhanghao Chen > -------

Re: Force to commit kafka offset when stop a job.

2024-06-11 Thread Lei Wang
; Zhanghao Chen > ---------- > *From:* Lei Wang > *Sent:* Thursday, June 6, 2024 16:54 > *To:* Zhanghao Chen ; ruanhang1...@gmail.com < > ruanhang1...@gmail.com> > *Cc:* user > *Subject:* Re: Force to commit kafka offset when stop a job. > > Thanks Zhanghao && Hang. >

What the default partition assignment strategy for KafkaSourceBuilder

2024-07-02 Thread Lei Wang
A simple flink task that consumes a kafka topic message and does some calculation. The number of partitions of the topic is 48, I set the parallel also 48 and expect one parallel consumes one partition. But after submitting the task I found that there's 5 parallels consuming two partitions and 5 pa

KafkaSink self-defined RoundRobinPartitioner not able to discover new partitions

2024-07-30 Thread Lei Wang
I wrote a FlinkRoundRobinPartitioner extends FlinkKafkaPartitioner and use it as following: KafkaSink kafkaSink = KafkaSink.builder() .setBootstrapServers(sinkServers).setKafkaProducerConfig(props) .setDeliverGuarantee(DeliveryGuarantee.AT_LEAST_ONCE) .setRecordSerializer(KafkaRecordSerializationS

Re: [Request Help] Flink StreamRecord granularity latency metrics

2024-07-31 Thread Lei Wang
Hi Yubin, We implement it in this manner. For every record, we define several time fields. When the record first enters the system, set one field to current time. After several complex calculation operator, set another field to currentTime. Just calculate the difference between the two values. Hop

Re: KafkaSink self-defined RoundRobinPartitioner not able to discover new partitions

2024-07-31 Thread Lei Wang
27;s a bug in RoundRobinPartitioner and the distribution is still uneven https://issues.apache.org/jira/browse/KAFKA-9965 On Tue, Jul 30, 2024 at 4:20 PM Lei Wang wrote: > I wrote a FlinkRoundRobinPartitioner extends FlinkKafkaPartitioner and use > it as following: > &g

Easily deployed shared file system that flink can use

2024-08-12 Thread Lei Wang
We deploy flink with standalone mode. We don't use hadoop so there's no hdfs. Also it is locally deployed and there's no cloud storage such as GFS、 s3、 oss can be accessed. Is there any other easily deployed file system that flink checkpoint/savepoint can use and share between task managers? Thank

Re: Handling data skewness

2024-08-19 Thread Lei Wang
Hi Karthick, Take a look at the distribution of your keys to see if there's some keys that contribute most of the data. If the distrubution is relatively uniform,try to use partitionCustomer with a self-defined partion function instead of keyBy. The default partition function in flink implements a

Re: Handling data skewness

2024-08-19 Thread Lei Wang
You can just use Math.abs(key.hashCode()) % numPartitions Regards, Lei On Mon, Aug 19, 2024 at 5:41 PM Karthick wrote: > Thanks Lei, will check it out. Please suggest me a Algorithm which > solves the problem if any. > > On Mon, Aug 19, 2024 at 2:17 PM Lei Wang wrote: >