The flink checkpoint time interval is not normal
the flink app ruuning on flink1.5.4 on yarn, it should be triggered every 5 min... [image: image.png]
Re: Any examples on invoke the Flink REST API post method ?
hi, http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/cancel-with-savepoint-404-Not-Found-td19227.html it may help you. and for flink on yarn cancel job , "yarn-cancel" work well not "cancel" the below python code for trigging savepoint work well. json = {"cancel-job": False} r = requests.post(url, json=json) Gary Yao 于2018年11月12日周一 下午5:33写道: > Hi Henry, > > What you see in the API documentation is a schema definition and not a > sample > request. The request body should be: > > { > "target-directory": "hdfs:///flinkDsl", > "cancel-job": false > } > > Let me know if that helps. > > Best, > Gary > > On Mon, Nov 12, 2018 at 7:15 AM vino yang wrote: > >> Hi Henry, >> >> Maybe Gary can help you, ping him for you. >> >> Thanks, vino. >> >> 徐涛 于2018年11月12日周一 下午12:45写道: >> >>> HI Experts, >>> I am trying to trigger a savepoint from Flink REST API on version 1.6 , >>> in the document it shows that I need to pass a json as a request body >>> { >>> "type" : "object”, >>> "id" : >>> "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:savepoints:SavepointTriggerRequestBody”, >>> "properties" : { >>> "target-directory" : { "type" : "string" }, >>> "cancel-job" : { "type" : "boolean" } >>> } >>> } >>> So I send the following json as >>> { >>> "type":"object”, >>> >>> "id":"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:savepoints:SavepointTriggerRequestBody”, >>> "properties”:{ >>> "target-directory":"hdfs:///flinkDsl”, >>> "cancel-job”:false >>> } >>> } >>> >>> And I use okhttp to send the request: >>> val MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8") >>> val body = RequestBody.create(MEDIA_TYPE_JSON, postBody) >>> val request = new Request.Builder() >>> .url(url) >>> .post(body) >>> .build() >>> client.newCall(request).execute() >>> >>> >>> but get an error {"errors":["Request did not match expected format >>> SavepointTriggerRequestBody.”]} >>> Would anyone give an example of how to invoke the post rest api of Flink? >>> Thanks a lot. >>> >>> Best >>> Henry >>> >>
Re: cannot resolve constructor when invoke FlinkKafkaProducer011 constructor in scala
IDEA only show "cannot resolve constructor" error message, without other error message. Dawid Wysakowicz 于2018年10月10日周三 下午5:55写道: > Hi, > > what is the exact error message you are getting? > > Best, > > Dawid > > On 10/10/18 11:51, 远远 wrote: > > invoke FlinkKafkaProducer011 constructor in scala: > > val producer = new > FlinkKafkaProducer011[PVEvent.Entity](appConf.getPvEventTopic, new > PvEventSerializeSchema,producerProps, > Optional.of(FlinkRebalancePartitioner[PVEvent.Entity])) > > and the constructor is : > > /** * Creates a FlinkKafkaProducer for a given topic. The sink produces its > input to * the topic. It accepts a keyed {@link KeyedSerializationSchema} and > possibly a custom {@link FlinkKafkaPartitioner}. * * If a partitioner is > not provided, written records will be partitioned by the attached key of each > * record (as determined by {@link > KeyedSerializationSchema#serializeKey(Object)}). If written records do not * > have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} > returns {@code null}), they * will be distributed to Kafka partitions in a > round-robin fashion. * * @param defaultTopicId The default topic to write > data to * @param serializationSchema A serializable serialization schema for > turning user objects into a kafka-consumable byte[] supporting key/value > messages * @param producerConfig Configuration properties for the > KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param > customPartitioner A serializable partitioner for assigning messages to Kafka > partitions. * If a partitioner is not provided, > records will be partitioned by the key of each record * >(determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If > the keys * are {@code null}, then records will be > distributed to Kafka partitions in a * round-robin > fashion. */public FlinkKafkaProducer011( > String defaultTopicId, KeyedSerializationSchema > serializationSchema, Properties producerConfig, > Optional> customPartitioner) { >this( > defaultTopicId, serializationSchema, producerConfig, > customPartitioner, Semantic.AT_LEAST_ONCE, > DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);} > > but cannot complie pass, and IDEA show ''cannot resolve constructor" > error. > and i invoke other constructor that without java8 Optional params, it > will no > error。 because of java8 Optional param?what should i do? > > >
cannot resolve constructor when invoke FlinkKafkaProducer011 constructor in scala
invoke FlinkKafkaProducer011 constructor in scala: val producer = new FlinkKafkaProducer011[PVEvent.Entity](appConf.getPvEventTopic, new PvEventSerializeSchema, producerProps, Optional.of(FlinkRebalancePartitioner[PVEvent.Entity])) and the constructor is : /** * Creates a FlinkKafkaProducer for a given topic. The sink produces its input to * the topic. It accepts a keyed {@link KeyedSerializationSchema} and possibly a custom {@link FlinkKafkaPartitioner}. * * If a partitioner is not provided, written records will be partitioned by the attached key of each * record (as determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If written records do not * have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} returns {@code null}), they * will be distributed to Kafka partitions in a round-robin fashion. * * @param defaultTopicId The default topic to write data to * @param serializationSchema A serializable serialization schema for turning user objects into a kafka-consumable byte[] supporting key/value messages * @param producerConfig Configuration properties for the KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param customPartitioner A serializable partitioner for assigning messages to Kafka partitions. * If a partitioner is not provided, records will be partitioned by the key of each record * (determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If the keys * are {@code null}, then records will be distributed to Kafka partitions in a * round-robin fashion. */ public FlinkKafkaProducer011( String defaultTopicId, KeyedSerializationSchema serializationSchema, Properties producerConfig, Optional> customPartitioner) { this( defaultTopicId, serializationSchema, producerConfig, customPartitioner, Semantic.AT_LEAST_ONCE, DEFAULT_KAFKA_PRODUCERS_POOL_SIZE); } but cannot complie pass, and IDEA show ''cannot resolve constructor" error. and i invoke other constructor that without java8 Optional params, it will no error。 because of java8 Optional param?what should i do?
Re: [DISCUSS] Dropping flink-storm?
+1, it‘s time to drop it😂 Zhijiang(wangzhijiang999) 于2018年9月29日周六 下午1:53写道: > Very agree with to drop it. +1 > > -- > 发件人:Jeff Carter > 发送时间:2018年9月29日(星期六) 10:18 > 收件人:dev > 抄 送:chesnay ; Till Rohrmann ; > user > 主 题:Re: [DISCUSS] Dropping flink-storm? > > +1 to drop it. > > On Fri, Sep 28, 2018, 7:25 PM Hequn Cheng wrote: > > > Hi, > > > > +1 to drop it. It seems that few people use it. > > > > Best, Hequn > > > > On Fri, Sep 28, 2018 at 10:22 PM Chesnay Schepler > > wrote: > > > > > I'm very much in favor of dropping it. > > > > > > Flink has been continually growing in terms of features, and IMO we've > > > reached the point where we should cull some of the more obscure ones. > > > > flink-storm, while interesting from a theoretical standpoint, offers too > > > little value. > > > > > > > Note that the bolt/spout wrapper parts of the part are still compatible, > > > it's only topologies that aren't working. > > > > > > IMO compatibility layers only add value if they ease the migration to > > > Flink APIs. > > > > * bolt/spout wrappers do this, but they will continue to work even if we > > > drop it > > > * topologies don't do this, so I'm not interested in then. > > > > > > On 28.09.2018 15:22, Till Rohrmann wrote: > > > > Hi everyone, > > > > > > > > I would like to discuss how to proceed with Flink's storm > > > > compatibility layer flink-strom. > > > > > > > > While working on removing Flink's legacy mode, I noticed that some > > > > parts of flink-storm rely on the legacy Flink client. In fact, at the > > > > > moment flink-storm does not work together with Flink's new distributed > > > > architecture. > > > > > > > > I'm also wondering how many people are actually using Flink's Storm > > > > compatibility layer and whether it would be worth porting it. > > > > > > > > I see two options how to proceed: > > > > > > > > 1) Commit to maintain flink-storm and port it to Flink's new > > architecture > > > > 2) Drop flink-storm > > > > > > > > > I doubt that we can contribute it to Apache Bahir [1], because once we > > > > > remove the legacy mode, this module will no longer work with all newer > > > > Flink versions. > > > > > > > > > Therefore, I would like to hear your opinion on this and in particular > > > > if you are using or planning to use flink-storm in the future. > > > > > > > > [1] https://github.com/apache/bahir-flink > > > > > > > > Cheers, > > > > Till > > > > > > > > > > > > > >
Fwd: why same Sliding ProcessTime TimeWindow triggered twice
-- Forwarded message - From: 远远 Date: 2018年9月16日周日 下午4:08 Subject: Re: why same Sliding ProcessTime TimeWindow triggered twice To: hi, the flink version that i test is 1.4.2, and i just run test code with local env in IDEA, and all the setting in the test code. my os is deepin(linux based debian) 15.7... and i try again, the print as flow: now ===> 2018-09-16 16:06:09 start ===> 2018-09-16 16:05:10 end ===> 2018-09-16 16:06:10 max ===> 2018-09-16 16:06:09 TimeWindow{start=153708511, end=153708517} aggreation now ===> 2018-09-16 16:06:09 start ===> 2018-09-16 16:05:10 end ===> 2018-09-16 16:06:10 max ===> 2018-09-16 16:06:09 TimeWindow{start=153708511, end=153708517} aggreation now ===> 2018-09-16 16:06:16 start ===> 2018-09-16 16:05:15 end ===> 2018-09-16 16:06:15 max ===> 2018-09-16 16:06:14 TimeWindow{start=1537085115000, end=1537085175000} aggreation now ===> 2018-09-16 16:06:19 start ===> 2018-09-16 16:05:20 end ===> 2018-09-16 16:06:20 max ===> 2018-09-16 16:06:19 TimeWindow{start=153708512, end=153708518} aggreation now ===> 2018-09-16 16:06:20 start ===> 2018-09-16 16:05:20 end ===> 2018-09-16 16:06:20 max ===> 2018-09-16 16:06:19 TimeWindow{start=153708512, end=153708518} aggreation now ===> 2018-09-16 16:06:24 start ===> 2018-09-16 16:05:25 end ===> 2018-09-16 16:06:25 max ===> 2018-09-16 16:06:24 TimeWindow{start=1537085125000, end=1537085185000} aggreation now ===> 2018-09-16 16:06:24 start ===> 2018-09-16 16:05:25 end ===> 2018-09-16 16:06:25 max ===> 2018-09-16 16:06:24 TimeWindow{start=1537085125000, end=1537085185000} aggreation now ===> 2018-09-16 16:06:25 start ===> 2018-09-16 16:05:25 end ===> 2018-09-16 16:06:25 max ===> 2018-09-16 16:06:24 TimeWindow{start=1537085125000, end=1537085185000} aggreation now ===> 2018-09-16 16:06:29 start ===> 2018-09-16 16:05:30 end ===> 2018-09-16 16:06:30 max ===> 2018-09-16 16:06:29 TimeWindow{start=153708513, end=153708519} aggreation now ===> 2018-09-16 16:06:29 start ===> 2018-09-16 16:05:30 end ===> 2018-09-16 16:06:30 max ===> 2018-09-16 16:06:29 TimeWindow{start=153708513, end=153708519} aggreation now ===> 2018-09-16 16:06:30 start ===> 2018-09-16 16:05:30 end ===> 2018-09-16 16:06:30 max ===> 2018-09-16 16:06:29 TimeWindow{start=153708513, end=153708519} aggreation now ===> 2018-09-16 16:06:36 start ===> 2018-09-16 16:05:35 end ===> 2018-09-16 16:06:35 max ===> 2018-09-16 16:06:34 TimeWindow{start=1537085135000, end=1537085195000} Xingcan Cui 于2018年9月16日周日 下午3:55写道: > Hi, > > I’ve tested your code in my local environment and everything worked fine. > It’s a little weird to see your output like that. I wonder if you could > give more information about your environment, e.g., your flink version and > execution settings. > > Thanks, > Xingcan > > On Sep 16, 2018, at 3:19 PM, 远远 wrote: > > hi,everyone: > today, i test Sliding ProcessTime TimeWindow with print some merties. i > find a same sliding window be printed twice, as fllower: > > now ===> 2018-09-16 15:11:44 > start ===> 2018-09-16 15:10:45 > end ===> 2018-09-16 15:11:45 > max ===> 2018-09-16 15:11:44 > TimeWindow{start=1537081845000, end=1537081905000} > aggreation > now ===> 2018-09-16 15:11:45 > start ===> 2018-09-16 15:10:45 > end ===> 2018-09-16 15:11:45 > max ===> 2018-09-16 15:11:44 > TimeWindow{start=1537081845000, end=1537081905000} > aggreation > > but when i do some sum operator,it will not, i want to know why? > thanks. > > my test code is: > > object SlidingProcessTimeWindowTest { > > def main(args: Array[String]): Unit = { > val env = StreamExecutionEnvironment.getExecutionEnvironment > env.addSource((context: SourceContext[String]) => {while(true) > context.collect(new Random().nextInt(100) + ":FRI")}) > .keyBy(s => s.endsWith("FRI")) > .timeWindow(Time.minutes(1), Time.seconds(5)) > .apply((e, w, iter, coll: Collector[String]) => { > println("now ===> " + convert(DateTime.now().getMillis)) > println("start ===> " + convert(w.getStart)) > println("end ===> " + convert(w.getEnd)) > println("max ===> " + convert(w.maxTimestamp())) > println(w) > //var reduce: Long = 0 > //for(e <- iter){ > //reduce += e.substring(0, e.length - 4).toInt > //} > //println("reduce ==> " + reduce) > coll.collect("aggreation") > }).setParallelism(1).print().setParallelism(1) > > env.execute() > } > > def convert(time: Long): String = { > new DateTime(time).toString("-MM-dd HH:mm:ss") > } > } > > >
why same Sliding ProcessTime TimeWindow triggered twice
hi,everyone: today, i test Sliding ProcessTime TimeWindow with print some merties. i find a same sliding window be printed twice, as fllower: now ===> 2018-09-16 15:11:44 start ===> 2018-09-16 15:10:45 end ===> 2018-09-16 15:11:45 max ===> 2018-09-16 15:11:44 TimeWindow{start=1537081845000, end=1537081905000} aggreation now ===> 2018-09-16 15:11:45 start ===> 2018-09-16 15:10:45 end ===> 2018-09-16 15:11:45 max ===> 2018-09-16 15:11:44 TimeWindow{start=1537081845000, end=1537081905000} aggreation but when i do some sum operator,it will not, i want to know why? thanks. my test code is: object SlidingProcessTimeWindowTest { def main(args: Array[String]): Unit = { val env = StreamExecutionEnvironment.getExecutionEnvironment env.addSource((context: SourceContext[String]) => {while(true) context.collect(new Random().nextInt(100) + ":FRI")}) .keyBy(s => s.endsWith("FRI")) .timeWindow(Time.minutes(1), Time.seconds(5)) .apply((e, w, iter, coll: Collector[String]) => { println("now ===> " + convert(DateTime.now().getMillis)) println("start ===> " + convert(w.getStart)) println("end ===> " + convert(w.getEnd)) println("max ===> " + convert(w.maxTimestamp())) println(w) //var reduce: Long = 0 //for(e <- iter){ //reduce += e.substring(0, e.length - 4).toInt //} //println("reduce ==> " + reduce) coll.collect("aggreation") }).setParallelism(1).print().setParallelism(1) env.execute() } def convert(time: Long): String = { new DateTime(time).toString("-MM-dd HH:mm:ss") } }
kafka consumer can not auto commit
I find kafka consumer can not auto commit, when I test kudu async client with flink async io today. - i do not enable checkpoint, and with procress time. - the consumer strategy that i set in connector is: setStartFromEarliest() the consumer config printed in console as follow: auto.commit.interval.ms = 5000 auto.offset.reset = latest bootstrap.servers = [node104.bigdata.dmp.local.com:9092, node105.bigdata.dmp.local.com:9092, node106.bigdata.dmp.local.com:9092] check.crcs = true client.id = connections.max.idle.ms = 54 enable.auto.commit = true exclude.internal.topics = true fetch.max.bytes = 52428800 fetch.max.wait.ms = 500 fetch.min.bytes = 1