The flink checkpoint time interval is not normal

2018-11-16 Thread
the flink app ruuning on flink1.5.4 on yarn, it should be triggered every 5
min...

[image: image.png]


Re: Any examples on invoke the Flink REST API post method ?

2018-11-12 Thread
hi,
http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/cancel-with-savepoint-404-Not-Found-td19227.html
it may help you.
and for flink on yarn cancel job , "yarn-cancel" work well not "cancel"
the below python code for trigging savepoint work well.

json = {"cancel-job": False}
r = requests.post(url, json=json)


Gary Yao  于2018年11月12日周一 下午5:33写道:

> Hi Henry,
>
> What you see in the API documentation is a schema definition and not a
> sample
> request. The request body should be:
>
> {
> "target-directory": "hdfs:///flinkDsl",
> "cancel-job": false
> }
>
> Let me know if that helps.
>
> Best,
> Gary
>
> On Mon, Nov 12, 2018 at 7:15 AM vino yang  wrote:
>
>> Hi Henry,
>>
>> Maybe Gary can help you, ping him for you.
>>
>> Thanks, vino.
>>
>> 徐涛  于2018年11月12日周一 下午12:45写道:
>>
>>> HI Experts,
>>> I am trying to trigger a savepoint from Flink REST API on version 1.6 ,
>>> in the document it shows that I need to pass a json as a request body
>>> {
>>>  "type" : "object”,
>>>   "id" :
>>> "urn:jsonschema:org:apache:flink:runtime:rest:messages:job:savepoints:SavepointTriggerRequestBody”,
>>>  "properties" : {
>>>  "target-directory" : { "type" : "string" },
>>>  "cancel-job" : { "type" : "boolean" }
>>>  }
>>> }
>>> So I send the following json as
>>> {
>>> "type":"object”,
>>>
>>> "id":"urn:jsonschema:org:apache:flink:runtime:rest:messages:job:savepoints:SavepointTriggerRequestBody”,
>>> "properties”:{
>>> "target-directory":"hdfs:///flinkDsl”,
>>> "cancel-job”:false
>>> }
>>> }
>>>
>>> And I use okhttp to send the request:
>>> val MEDIA_TYPE_JSON = MediaType.parse("application/json; charset=utf-8")
>>> val body = RequestBody.create(MEDIA_TYPE_JSON, postBody)
>>> val request = new Request.Builder()
>>>   .url(url)
>>>   .post(body)
>>>   .build()
>>> client.newCall(request).execute()
>>>
>>>
>>> but get an error  {"errors":["Request did not match expected format
>>> SavepointTriggerRequestBody.”]}
>>> Would anyone give an example of how to invoke the post rest api of Flink?
>>> Thanks a lot.
>>>
>>> Best
>>> Henry
>>>
>>


Re: cannot resolve constructor when invoke FlinkKafkaProducer011 constructor in scala

2018-10-10 Thread
IDEA only show "cannot resolve constructor" error message, without other
error message.

Dawid Wysakowicz  于2018年10月10日周三 下午5:55写道:

> Hi,
>
> what is the exact error message you are getting?
>
> Best,
>
> Dawid
>
> On 10/10/18 11:51, 远远 wrote:
>
> invoke FlinkKafkaProducer011 constructor in scala:
>
> val producer = new 
> FlinkKafkaProducer011[PVEvent.Entity](appConf.getPvEventTopic, new 
> PvEventSerializeSchema,producerProps, 
> Optional.of(FlinkRebalancePartitioner[PVEvent.Entity]))
>
> and the constructor is :
>
> /** * Creates a FlinkKafkaProducer for a given topic. The sink produces its 
> input to * the topic. It accepts a keyed {@link KeyedSerializationSchema} and 
> possibly a custom {@link FlinkKafkaPartitioner}. * * If a partitioner is 
> not provided, written records will be partitioned by the attached key of each 
> * record (as determined by {@link 
> KeyedSerializationSchema#serializeKey(Object)}). If written records do not * 
> have a key (i.e., {@link KeyedSerializationSchema#serializeKey(Object)} 
> returns {@code null}), they * will be distributed to Kafka partitions in a 
> round-robin fashion. * * @param defaultTopicId The default topic to write 
> data to * @param serializationSchema A serializable serialization schema for 
> turning user objects into a kafka-consumable byte[] supporting key/value 
> messages * @param producerConfig Configuration properties for the 
> KafkaProducer. 'bootstrap.servers.' is the only required argument. * @param 
> customPartitioner A serializable partitioner for assigning messages to Kafka 
> partitions. *  If a partitioner is not provided, 
> records will be partitioned by the key of each record *   
>(determined by {@link KeyedSerializationSchema#serializeKey(Object)}). If 
> the keys *  are {@code null}, then records will be 
> distributed to Kafka partitions in a *  round-robin 
> fashion. */public FlinkKafkaProducer011(
>   String defaultTopicId,  KeyedSerializationSchema 
> serializationSchema,  Properties producerConfig,  
> Optional> customPartitioner) {
>this(
>   defaultTopicId,  serializationSchema,  producerConfig,  
> customPartitioner,  Semantic.AT_LEAST_ONCE,  
> DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);}
>
> but cannot complie pass, and IDEA show ''cannot resolve constructor"
> error.
> and i invoke other constructor that without java8 Optional params, it
> will no
> error。  because of java8 Optional param?what should i do?
>
>
>


cannot resolve constructor when invoke FlinkKafkaProducer011 constructor in scala

2018-10-10 Thread
invoke FlinkKafkaProducer011 constructor in scala:

val producer = new
FlinkKafkaProducer011[PVEvent.Entity](appConf.getPvEventTopic, new
PvEventSerializeSchema,
producerProps, Optional.of(FlinkRebalancePartitioner[PVEvent.Entity]))

and the constructor is :

/**
 * Creates a FlinkKafkaProducer for a given topic. The sink produces
its input to
 * the topic. It accepts a keyed {@link KeyedSerializationSchema} and
possibly a custom {@link FlinkKafkaPartitioner}.
 *
 * If a partitioner is not provided, written records will be
partitioned by the attached key of each
 * record (as determined by {@link
KeyedSerializationSchema#serializeKey(Object)}). If written records do
not
 * have a key (i.e., {@link
KeyedSerializationSchema#serializeKey(Object)} returns {@code null}),
they
 * will be distributed to Kafka partitions in a round-robin fashion.
 *
 * @param defaultTopicId The default topic to write data to
 * @param serializationSchema A serializable serialization schema for
turning user objects into a kafka-consumable byte[] supporting
key/value messages
 * @param producerConfig Configuration properties for the
KafkaProducer. 'bootstrap.servers.' is the only required argument.
 * @param customPartitioner A serializable partitioner for assigning
messages to Kafka partitions.
 *  If a partitioner is not provided, records
will be partitioned by the key of each record
 *  (determined by {@link
KeyedSerializationSchema#serializeKey(Object)}). If the keys
 *  are {@code null}, then records will be
distributed to Kafka partitions in a
 *  round-robin fashion.
 */
public FlinkKafkaProducer011(
  String defaultTopicId,
  KeyedSerializationSchema serializationSchema,
  Properties producerConfig,
  Optional> customPartitioner) {
   this(
  defaultTopicId,
  serializationSchema,
  producerConfig,
  customPartitioner,
  Semantic.AT_LEAST_ONCE,
  DEFAULT_KAFKA_PRODUCERS_POOL_SIZE);
}

but cannot complie pass, and IDEA show ''cannot resolve constructor" error.
and i invoke other constructor that without java8 Optional params, it  will
no
error。  because of java8 Optional param?what should i do?


Re: [DISCUSS] Dropping flink-storm?

2018-09-28 Thread
+1, it‘s time to drop it😂

Zhijiang(wangzhijiang999)  于2018年9月29日周六
下午1:53写道:

> Very agree with to drop it. +1
>
> --
> 发件人:Jeff Carter 
> 发送时间:2018年9月29日(星期六) 10:18
> 收件人:dev 
> 抄 送:chesnay ; Till Rohrmann ;
> user 
> 主 题:Re: [DISCUSS] Dropping flink-storm?
>
> +1 to drop it.
>
> On Fri, Sep 28, 2018, 7:25 PM Hequn Cheng  wrote:
>
> > Hi,
> >
> > +1 to drop it. It seems that few people use it.
> >
> > Best, Hequn
> >
> > On Fri, Sep 28, 2018 at 10:22 PM Chesnay Schepler 
> > wrote:
> >
> > > I'm very much in favor of dropping it.
> > >
> > > Flink has been continually growing in terms of features, and IMO we've
> > > reached the point where we should cull some of the more obscure ones.
>
> > > flink-storm, while interesting from a theoretical standpoint, offers too
> > > little value.
> > >
>
> > > Note that the bolt/spout wrapper parts of the part are still compatible,
> > > it's only topologies that aren't working.
> > >
> > > IMO compatibility layers only add value if they ease the migration to
> > > Flink APIs.
>
> > > * bolt/spout wrappers do this, but they will continue to work even if we
> > > drop it
> > > * topologies don't do this, so I'm not interested in then.
> > >
> > > On 28.09.2018 15:22, Till Rohrmann wrote:
> > > > Hi everyone,
> > > >
> > > > I would like to discuss how to proceed with Flink's storm
> > > > compatibility layer flink-strom.
> > > >
> > > > While working on removing Flink's legacy mode, I noticed that some
> > > > parts of flink-storm rely on the legacy Flink client. In fact, at the
>
> > > > moment flink-storm does not work together with Flink's new distributed
> > > > architecture.
> > > >
> > > > I'm also wondering how many people are actually using Flink's Storm
> > > > compatibility layer and whether it would be worth porting it.
> > > >
> > > > I see two options how to proceed:
> > > >
> > > > 1) Commit to maintain flink-storm and port it to Flink's new
> > architecture
> > > > 2) Drop flink-storm
> > > >
>
> > > > I doubt that we can contribute it to Apache Bahir [1], because once we
>
> > > > remove the legacy mode, this module will no longer work with all newer
> > > > Flink versions.
> > > >
>
> > > > Therefore, I would like to hear your opinion on this and in particular
> > > > if you are using or planning to use flink-storm in the future.
> > > >
> > > > [1] https://github.com/apache/bahir-flink
> > > >
> > > > Cheers,
> > > > Till
> > >
> > >
> > >
> >
>
>
>


Fwd: why same Sliding ProcessTime TimeWindow triggered twice

2018-09-16 Thread
-- Forwarded message -
From: 远远 
Date: 2018年9月16日周日 下午4:08
Subject: Re: why same Sliding ProcessTime TimeWindow triggered twice
To: 


hi, the flink version that i test  is 1.4.2, and i just run test code with
local env in IDEA, and all the setting in the test code.
my os is deepin(linux based debian) 15.7...

and i try again, the print as flow:
now   ===> 2018-09-16 16:06:09
start ===> 2018-09-16 16:05:10
end   ===> 2018-09-16 16:06:10
max   ===> 2018-09-16 16:06:09
TimeWindow{start=153708511, end=153708517}
aggreation
now   ===> 2018-09-16 16:06:09
start ===> 2018-09-16 16:05:10
end   ===> 2018-09-16 16:06:10
max   ===> 2018-09-16 16:06:09
TimeWindow{start=153708511, end=153708517}
aggreation
now   ===> 2018-09-16 16:06:16
start ===> 2018-09-16 16:05:15
end   ===> 2018-09-16 16:06:15
max   ===> 2018-09-16 16:06:14
TimeWindow{start=1537085115000, end=1537085175000}
aggreation
now   ===> 2018-09-16 16:06:19
start ===> 2018-09-16 16:05:20
end   ===> 2018-09-16 16:06:20
max   ===> 2018-09-16 16:06:19
TimeWindow{start=153708512, end=153708518}
aggreation
now   ===> 2018-09-16 16:06:20
start ===> 2018-09-16 16:05:20
end   ===> 2018-09-16 16:06:20
max   ===> 2018-09-16 16:06:19
TimeWindow{start=153708512, end=153708518}
aggreation
now   ===> 2018-09-16 16:06:24
start ===> 2018-09-16 16:05:25
end   ===> 2018-09-16 16:06:25
max   ===> 2018-09-16 16:06:24
TimeWindow{start=1537085125000, end=1537085185000}
aggreation
now   ===> 2018-09-16 16:06:24
start ===> 2018-09-16 16:05:25
end   ===> 2018-09-16 16:06:25
max   ===> 2018-09-16 16:06:24
TimeWindow{start=1537085125000, end=1537085185000}
aggreation
now   ===> 2018-09-16 16:06:25
start ===> 2018-09-16 16:05:25
end   ===> 2018-09-16 16:06:25
max   ===> 2018-09-16 16:06:24
TimeWindow{start=1537085125000, end=1537085185000}
aggreation
now   ===> 2018-09-16 16:06:29
start ===> 2018-09-16 16:05:30
end   ===> 2018-09-16 16:06:30
max   ===> 2018-09-16 16:06:29
TimeWindow{start=153708513, end=153708519}
aggreation
now   ===> 2018-09-16 16:06:29
start ===> 2018-09-16 16:05:30
end   ===> 2018-09-16 16:06:30
max   ===> 2018-09-16 16:06:29
TimeWindow{start=153708513, end=153708519}
aggreation
now   ===> 2018-09-16 16:06:30
start ===> 2018-09-16 16:05:30
end   ===> 2018-09-16 16:06:30
max   ===> 2018-09-16 16:06:29
TimeWindow{start=153708513, end=153708519}
aggreation
now   ===> 2018-09-16 16:06:36
start ===> 2018-09-16 16:05:35
end   ===> 2018-09-16 16:06:35
max   ===> 2018-09-16 16:06:34
TimeWindow{start=1537085135000, end=1537085195000}


Xingcan Cui  于2018年9月16日周日 下午3:55写道:

> Hi,
>
> I’ve tested your code in my local environment and everything worked fine.
> It’s a little weird to see your output like that. I wonder if you could
> give more information about your environment, e.g., your flink version and
> execution settings.
>
> Thanks,
> Xingcan
>
> On Sep 16, 2018, at 3:19 PM, 远远  wrote:
>
> hi,everyone:
> today, i test Sliding ProcessTime TimeWindow with print some merties. i
> find a same sliding window be printed twice, as fllower:
>
> now   ===> 2018-09-16 15:11:44
> start ===> 2018-09-16 15:10:45
> end   ===> 2018-09-16 15:11:45
> max   ===> 2018-09-16 15:11:44
> TimeWindow{start=1537081845000, end=1537081905000}
> aggreation
> now   ===> 2018-09-16 15:11:45
> start ===> 2018-09-16 15:10:45
> end   ===> 2018-09-16 15:11:45
> max   ===> 2018-09-16 15:11:44
> TimeWindow{start=1537081845000, end=1537081905000}
> aggreation
>
> but when i do some sum operator,it will not, i want to know why?
> thanks.
>
> my test code is:
>
> object SlidingProcessTimeWindowTest {
>
> def main(args: Array[String]): Unit = {
> val env = StreamExecutionEnvironment.getExecutionEnvironment
> env.addSource((context: SourceContext[String]) => {while(true) 
> context.collect(new Random().nextInt(100) + ":FRI")})
> .keyBy(s => s.endsWith("FRI"))
> .timeWindow(Time.minutes(1), Time.seconds(5))
> .apply((e, w, iter, coll: Collector[String]) => {
> println("now   ===> " + convert(DateTime.now().getMillis))
> println("start ===> " + convert(w.getStart))
> println("end   ===> " + convert(w.getEnd))
> println("max   ===> " + convert(w.maxTimestamp()))
> println(w)
> //var reduce: Long = 0
> //for(e <- iter){
> //reduce += e.substring(0, e.length - 4).toInt
> //}
> //println("reduce ==> " + reduce)
> coll.collect("aggreation")
> }).setParallelism(1).print().setParallelism(1)
>
> env.execute()
> }
>
> def convert(time: Long): String = {
> new DateTime(time).toString("-MM-dd HH:mm:ss")
> }
> }
>
>
>


why same Sliding ProcessTime TimeWindow triggered twice

2018-09-16 Thread
hi,everyone:
today, i test Sliding ProcessTime TimeWindow with print some merties. i
find a same sliding window be printed twice, as fllower:

now   ===> 2018-09-16 15:11:44
start ===> 2018-09-16 15:10:45
end   ===> 2018-09-16 15:11:45
max   ===> 2018-09-16 15:11:44
TimeWindow{start=1537081845000, end=1537081905000}
aggreation
now   ===> 2018-09-16 15:11:45
start ===> 2018-09-16 15:10:45
end   ===> 2018-09-16 15:11:45
max   ===> 2018-09-16 15:11:44
TimeWindow{start=1537081845000, end=1537081905000}
aggreation

but when i do some sum operator,it will not, i want to know why?
thanks.

my test code is:

object SlidingProcessTimeWindowTest {

def main(args: Array[String]): Unit = {
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.addSource((context: SourceContext[String]) => {while(true)
context.collect(new Random().nextInt(100) + ":FRI")})
.keyBy(s => s.endsWith("FRI"))
.timeWindow(Time.minutes(1), Time.seconds(5))
.apply((e, w, iter, coll: Collector[String]) => {
println("now   ===> " + convert(DateTime.now().getMillis))
println("start ===> " + convert(w.getStart))
println("end   ===> " + convert(w.getEnd))
println("max   ===> " + convert(w.maxTimestamp()))
println(w)
//var reduce: Long = 0
//for(e <- iter){
//reduce += e.substring(0, e.length - 4).toInt
//}
//println("reduce ==> " + reduce)
coll.collect("aggreation")
}).setParallelism(1).print().setParallelism(1)

env.execute()
}

def convert(time: Long): String = {
new DateTime(time).toString("-MM-dd HH:mm:ss")
}
}


kafka consumer can not auto commit

2018-08-22 Thread
I find kafka consumer can not auto commit, when I test kudu async client
with flink async io today.
- i do not enable checkpoint, and with procress time.
- the consumer strategy that i set in connector is: setStartFromEarliest()

the consumer config printed in console as follow:

auto.commit.interval.ms = 5000
auto.offset.reset = latest
bootstrap.servers = [node104.bigdata.dmp.local.com:9092,
node105.bigdata.dmp.local.com:9092, node106.bigdata.dmp.local.com:9092]
check.crcs = true
client.id =
connections.max.idle.ms = 54
enable.auto.commit = true
exclude.internal.topics = true
fetch.max.bytes = 52428800
fetch.max.wait.ms = 500
fetch.min.bytes = 1