Re: Subpar performance of temporal joins with RocksDB backend

2021-07-09 Thread Adrian Bednarz
I didn’t tweak any RocksDB knobs. The only thing we did was to increase managed memory to 12gb which was supposed to help RocksDB according to the documentation. The rest stays at the defaults. Incremental checkpointing was enabled as well but it made no difference in performance if we disabled it.

Re: Running Flink Dataset jobs Sequentially

2021-07-09 Thread Ken Krugler
FWIW I had to do something similar in the past. My solution was to… 1. Create a custom reader that added the source directory to the input data (so I had a Tuple2 2. Create a job that reads from all source directories, using HadoopInputFormat for text 3. Constrain the parallelism of this initial

Re: Subpar performance of temporal joins with RocksDB backend

2021-07-09 Thread Maciej Bryński
Hi Adrian, Could you share your state backend configuration ? Regards, Maciek pt., 9 lip 2021 o 19:09 Adrian Bednarz napisał(a): > > Hello, > > We are experimenting with lookup joins in Flink 1.13.0. Unfortunately, we > unexpectedly hit significant performance degradation when changing the stat

Subpar performance of temporal joins with RocksDB backend

2021-07-09 Thread Adrian Bednarz
Hello, We are experimenting with lookup joins in Flink 1.13.0. Unfortunately, we unexpectedly hit significant performance degradation when changing the state backend to RocksDB. We performed tests with two tables: fact table TXN and dimension table CUSTOMER with the following schemas: TXN: |--

Re: Ho to convert flat json to nested complex json in Flink sql?

2021-07-09 Thread Caizhi Weng
Hi! You can define your sink with the following schema: CREATE TABLE kafka_sink ( employee ROW ) WITH ( 'connector' = 'kafka', 'format' = 'json' // other properties... ); You can also insert into this sink with the following SQL: INSERT INTO kafka_sink SELECT ROW(id, name) FROM kafka_so

Ho to convert flat json to nested complex json in Flink sql?

2021-07-09 Thread 1095193...@qq.com
Hi community, I'll receive json message from Kafka, convert flat json to nested json and send it back to Kafka. receive message from Kafka: {“id”:"001","name":"wang"} send message back to Kafka: {"employee":{“id”:"001","name":"wang"}} How to do it in Flink sql? 1095193...@qq.com

Kafka Consumer Retries Failing

2021-07-09 Thread Rahul Patwari
Hi, We have a Flink 1.11.1 Version streaming pipeline in production which reads from Kafka. Kafka Server version is 2.5.0 - confluent 5.5.0 Kafka Client Version is 2.4.1 - {"component":"org.apache.kafka.common.utils.AppInfoParser$AppInfo","message":"Kafka version: 2.4.1","method":""} Occasionally

Re: Job Recovery Time on TM Lost

2021-07-09 Thread Till Rohrmann
Gen is right with his explanation why the dead TM discovery can be faster with Flink < 1.12. Concerning flaky TaskManager connections: 2.1 I think the problem is that the receiving TM does not know the container ID of the sending TM. It only knows its address. But this is something one could impr