Re: Re: [DISCUSS] Removing deprecated methods from DataStream API

2020-08-18 Thread Till Rohrmann
Having looked at the proposed set of methods to remove I've noticed that some of them are actually annotated with @Public. According to our stability guarantees, only major releases (1.0, 2.0, etc.) can break APIs with this annotation. Hence, I believe that we cannot simply remove them unless the c

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-18 Thread Dawid Wysakowicz
Hi all, @Klou Nice write up. One comment I have is I would suggest using a different configuration parameter name. The way I understand the proposal the BATCH/STREAMING/AUTOMATIC affects not only the scheduling mode but types of shuffles as well. How about `execution.mode` ? Or `execution-runtime-

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-18 Thread Kostas Kloudas
Hi Yun and Dawid, Dawid is correct in that: ``` BATCH = pipelined scheduling with region failover + blocking keyBy shuffles (all pointwise shuffles pipelined) STREAM = eager scheduling with checkpointing + pipelined keyBy shuffles AUTOMATIC = choose based on sources (ALL bounded == BATCH, STREAMIN

How to write a customer sink partitioner when using flinksql kafka-connector

2020-08-18 Thread wangl...@geekplus.com
CREATE TABLE kafka_sink_table( warehouse_id INT, pack_task_order_id BIGINT, out_order_code STRING, pick_order_id BIGINT, end_time BIGINT WITH ( 'connector'='kafka', 'topic'='ods_wms_pack_task_order', 'properties.bootstrap.servers'='172.19.78.32:9092', 'format'='json' ); INSERT INTO ka

Re: How to write a customer sink partitioner when using flinksql kafka-connector

2020-08-18 Thread Timo Walther
Hi Lei, you can check how the FlinkFixedPartitioner [1] or Tuple2FlinkPartitioner [2] are implemented. Since you are using SQL connectors of the newest generation, you should receive an instance of org.apache.flink.table.data.RowData in your partitioner. You can create a Maven project with a

Re: Flink SQL UDAF com.esotericsoftware.kryo.KryoException: Encountered unregistered class ID

2020-08-18 Thread Timo Walther
Hi Forideal, luckily these problems will belong to the past in Flink 1.12 when UDAF are updated to the new type system [1]. Lists will be natively supported and registering custom KryoSerializers consistently as well. Until then, another workaround is to override getAccumulatorType() and def

Re: [DISCUSS] FLIP-134: DataStream Semantics for Bounded Input

2020-08-18 Thread David Anderson
Being able to optionally fire registered processing time timers at the end of a job would be interesting, and would help in (at least some of) the cases I have in mind. I don't have a better idea. David On Mon, Aug 17, 2020 at 8:24 PM Kostas Kloudas wrote: > Hi Kurt and David, > > Thanks a lot

Re: Flink checkpoint recovery time

2020-08-18 Thread Yun Tang
Hi Zhinan, For the time to detect the failure, you could refer to the time when 'fullRestarts' increase. That could give you information about the time of job failure. For the checkpoint recovery time, there actually exist two parts: 1. The time to read checkpoint meta in JM. However, this

Async IO with SQL API

2020-08-18 Thread Spurthi Chaganti
Hello folks, We are using flink 1.9 SQL API and we are NOT using blink planner. Our platform users express their flink jobs as SQL queries. We currently have a use case of asynchronously lookup data from third parties for every event we read from kafka stream and populate additional fields whi

Re: How to write a customer sink partitioner when using flinksql kafka-connector

2020-08-18 Thread Xingbo Huang
Hi Lei, If you want to write your custom partitioner, I think you can refer to the built-in FlinkFixedPartitioner[1] [1] https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka-base/src/main/java/org/apache/flink/streaming/connectors/kafka/partitioner/FlinkFixedPartitio

Re: How to write a customer sink partitioner when using flinksql kafka-connector

2020-08-18 Thread Danny Chan
Hi, Lei ~ You may need to implement the abstract class FlinkKafkaPartitioner and then use the full class name as the param value of the option ‘sink.partitioner’. FlinkFixedPartitioner[1] is a good example there. [1]  https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-k

Re: Format for timestamp type in Flink SQL

2020-08-18 Thread godfrey he
Hi Youngwoo, > 1. TIMESTAMP WITH LOCAL TIME ZONE Currently, SQL client uses legacy types for the collect sink, that means `TIMESTAMP WITH LOCAL TIME ZONE` is not supported. you can refer to [1] to find the supported types, and there is a pr [2] to fix this. >2. TIMESTAMP(3) WITH LOCAL TIME ZONE I

Will the order of message be guaranteed when using asyno IO?

2020-08-18 Thread wangl...@geekplus.com
Read kafka message and write the message to MySQL Writing to database is the bottleneck when too much message is sent to kafka with high throughput. So i want to change the operator to asynchronously. public void asyncInvoke(ObjectNode node, ResultFuture resultFuture) throws Exception {

Re: Flink checkpoint recovery time

2020-08-18 Thread Zhinan Cheng
Hi Yun, Thanks a lot for your help. Seems hard to measure the checkpointing restore time currently. I do monitor the "fullRestarts" metric and others like "uptime" and "downtime" to observe some information about failure recovery. Still some confusions: i) I found the time for the jobmanager to m

The rpc invocation size exceeds the maximum akka framesize when the job was re submitted.

2020-08-18 Thread Joshua Fan
hi, We have a flink job platform which will resubmit the job when the job failed without platform user involvement. Today a resubmit failed because of the error below, I changed the akka.Frameszie, and the resubmit succeed. My question is, there is nothing change to the job, the jar, the program,