Emitting current state to a sink

2019-04-25 Thread Avi Levi
Hi,
We have a keyed pipeline with persisted state.
Is there a way to broadcast a command and collect all values that persisted
in  the state ?

The end result can be for example sending a fetch command to all operators
and emitting the results to some sink

why do we need it ? from time to time we might want to check if we are
missing keys what are the additional keys or simply emit the current state
to a table and to query it.

I tried simply broadcasting a command and addressing the persisted state
but that resulted with:
java.lang.NullPointerException: No key set. This method should not be
called outside of a keyed context.

is there a good way to achieve that ?

Cheers
Avi


Re: taskmanager faild

2019-04-25 Thread Xintong Song
hi naisili,

我没有在你的邮件里看到任何附件、截图或者文字描述的错误,麻烦你再确认一次。

Thank you~

Xintong Song



On Fri, Apr 26, 2019 at 10:46 AM naisili Yuan 
wrote:

> 还是集群稳定性问题,发现了这个错误,我想问下是不是我配置集群高可用的问题,是否不依赖zookeeper会更稳定一点。
> 希望得到回复,谢谢!
>
> naisili Yuan  于2019年4月22日周一 下午2:23写道:
>
>> 不好意思,我忘记贴图了。
>> 我的flink standalone集群挂了,查看日志,看到截图上的错误
>> 我自己分析不明白,谷歌也查不到对应的问题。希望能得到你们的帮助,谢谢!
>>
>> 问题描述:我正在运行任务的flink集群跑了两天后挂掉了,原因是所有taskmanager进程全部挂了,只有一个jobmanager还在。
>>
>> 集群环境:5台centos7的机器,32核,256GB内存,2个jobmanager,5个taskmanager,每台机器32个slots。jobmanager使用zookeeper做了高可用。
>> 初步分析原因:zookeeper的问题
>> 另外:不小心把日志清理了,没法粘贴文字了~
>>
>> Xintong Song  于2019年4月22日周一 下午1:27写道:
>>
>>> Hi naisili,
>>>
>>> This is the user-zh mailing list, so if you speak Chinese you can ask
>>> questions in Chinese. If you prefer using English, you can send emails to
>>> u...@flink.apache.org. Hope that helps you.
>>>
>>> BTW, I think you forgot to attache the screenshot.
>>>
>>> Thank you~
>>>
>>> Xintong Song
>>>
>>>
>>>
>>> On Mon, Apr 22, 2019 at 10:53 AM naisili Yuan 
>>> wrote:
>>>
>>> > I use standalone cluster on flink, and i use zookeeper for the
>>> jobmanager
>>> > HA.
>>> > The Screenshot is my taskmanager proccess down log, falte a error.
>>> > And is don't know why it failed, even i google the error.
>>> > Ask for help, thanks.
>>> >
>>> >
>>> >
>>>
>>


Re: How to implement custom stream operator over a window? And after the Count-Min Sketch?

2019-04-25 Thread Rong Rong
Hi Felipe,

I am not sure the algorithm requires to construct a new extension of the
window operator. I think your implementation of the CountMinSketch object
as an aggregator:
E.g.
1. AggregateState (ACC) should be the aggregating accumulate
count-min-sketch 2-D hash array (plus a few other needed fields).
2. accumulate method just simply do the update.
3. getResult simply get the frequency from sketch.

Thus you will not need to use a customized ValueStateDescriptor.

But I agree that maybe it is a good idea to support a class of use cases
that requires approximate aggregate state (like HyperLogLog?), this
might've been a good value add in my opinion.
I think some further discussion is needed if we are going down that path.
Do you think the original FLINK-2147
 JIRA ticket is a good
place to carry out that discussion? We can probably continue there or
create a new JIRA for discussion.

--
Rong

On Wed, Apr 24, 2019 at 1:32 AM Felipe Gutierrez <
felipe.o.gutier...@gmail.com> wrote:

> Hi Rong,
>
> thanks for your reply. I guess I already did something regarding what you
> have told to me. I have one example on this application [1], which uses
> this state [2]  and computes a CountMinSketch [3].
>
> I am seeking how to implement my own operator over a window in order to
> have more fine-grained control over it and learn with it. And hopefully,
> building a path to contribute to Flink in the future [4].
>
> [1]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L69
> [2]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/examples/stream/MultiSensorMultiStationsReadingMqtt2.java#L182
> [3]
> https://github.com/felipegutierrez/explore-flink/blob/master/src/main/java/org/sense/flink/util/CountMinSketch.java
> [4] https://issues.apache.org/jira/browse/FLINK-2147
>
> Best,
> Felipe
> *--*
> *-- Felipe Gutierrez*
>
> *-- skype: felipe.o.gutierrez*
> *--* *https://felipeogutierrez.blogspot.com
> *
>
>
> On Wed, Apr 24, 2019 at 2:06 AM Rong Rong  wrote:
>
>> Hi Felipe,
>>
>> In a short glance, the question can depend on how your window is (is
>> there any overlap like sliding window) and how many data you would like to
>> process.
>>
>> In general, you can always buffer all the data into a ListState and apply
>> your window function by iterating through all those buffered elements [1].
>> Provided that the data size is small enough to be hold efficiently in the
>> state-backend.
>> If this algorithm has some sort of pre-aggregation that can simplify the
>> buffering through an incremental, orderless aggregation, you can also think
>> about using [2].
>> With these two approaches, you do not necessarily need to implement your
>> own window operator (extending window operator can be tricky), and you also
>> have access to the internal state [3].
>>
>> Hope these helps your exploration.
>>
>> Thanks,
>> Rong
>>
>> [1]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction
>> [2]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#processwindowfunction-with-incremental-aggregation
>> [3]
>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/operators/windows.html#using-per-window-state-in-processwindowfunction
>>
>> On Tue, Apr 23, 2019 at 8:16 AM Felipe Gutierrez <
>> felipe.o.gutier...@gmail.com> wrote:
>>
>>> Hi,
>>>
>>> I want to implement my own operator that computes the Count-Min Sketch
>>> over a window in Flink. Then, I found this Jira issue [1]
>>>  which is exactly
>>> what I want. I believe that I have to work out my skills to arrive at a
>>> mature solution.
>>>
>>> So, the first thing that comes to my mind is to create my custom
>>> operator like the AggregateApplyWindowFunction [2]
>>> .
>>> Through this I can create the summary of my data over a window.
>>>
>>> Also, I found this custom JoinOperator example by Till Rohrmann [3]
>>>  which I think I can base
>>> my implementation since it is done over a DataStream.
>>>
>>> What are your suggestions to me in order to start to implement a custom
>>> stream operator which computes Count-Min Sketch? Do you have any custom
>>> operator over window/keyBy that I can learn with the source code?
>>>
>>> ps.: I have implemented (looking at Blink source code) this a custom
>>> Combiner [4]
>>> 
>>> (map-combiner-reduce) operator.

Re: Exceptions when launching counts on a Flink DataSet concurrently

2019-04-25 Thread Juan Rodríguez Hortalá
Any thoughts on this?

On Sun, Apr 7, 2019, 6:56 PM Juan Rodríguez Hortalá <
juan.rodriguez.hort...@gmail.com> wrote:

> Hi,
>
> I have a very simple program using the local execution environment, that
> throws NPE and other exceptions related to concurrent access when launching
> a count for a DataSet from different threads. The program is
> https://gist.github.com/juanrh/685a89039e866c1067a6efbfc22c753e which is
> basically this:
>
> def doubleCollectConcurrent = {
>   val env = ExecutionEnvironment.createLocalEnvironment(3)
>   val xs = env.fromCollection(1 to 100).map{_+1}
>   implicit val ec = 
> ExecutionContext.fromExecutor(Executors.newFixedThreadPool(10))
>
>   val pendingActions = Seq.fill(10)(
> Future { println(s"xs.count = ${xs.count}") }
>   )
>   val pendingActionsFinished = Future.fold(pendingActions)(Unit){ (u1, u2) =>
> println("pending action finished")
> Unit
>   }
>   Await.result(pendingActionsFinished, 10 seconds)
>
>   ok
> }
>
>
> It looks like the issue is on OperatorTranslation.java at
> https://github.com/apache/flink/blob/master/flink-java/src/main/java/org/apache/flink/api/java/operators/OperatorTranslation.java#L51,
> when a sink is added to the sinks list while that list is being traversed.
> I have the impression that this is by design, so I'd like to confirm that
> this is the expected behaviour, and whether this is happening only for the
> local execution environment, or if this affects all execution environments
> implementations. Other related questions I have are:
>
>- Is this documented somewhere? I'm quite new to Flink, so I might
>have missed this. Is there any known workaround for concurrently launching
>counts and other sink computations on the same DataSet?
>- Is it safe performing a sequence of calls to DataSet sink methods
>like count or collect, on the same DataSet, as long as they are performed
>from the same thread? From my experience it looks like it is, but I'd like
>to get a confirmation if possible.
>
> This might be related to
> https://stackoverflow.com/questions/51035465/concurrentmodificationexception-in-flink
> but I'm not sure.
>
> Thanks a lot for your help.
>
> Greetings,
>
> Juan
>


RE: EXT :read only mode for Flink UI

2019-04-25 Thread Martin, Nick
AFAIK, there are no granular permissions like that built into Flink. Limiting 
access to the REST API seems like a good place to start. The web UI uses the 
API, but controlling it there means you’re locking down all means of access. 
The designers of the API were disciplined about what HTTP verbs were used, so 
allowing all GET requests and denying PUT/POST/DELETE/PATCH would mean read 
only access, and I think that would be straightforward to implement with an 
HTTP proxy

From: uday bhaskar [mailto:uday...@gmail.com]
Sent: Thursday, April 25, 2019 6:57 AM
To: user@flink.apache.org
Subject: EXT :read only mode for Flink UI

Hi

We are looking at running Flink on Kubernetes in Job cluster mode. As part of 
our plans we do not want to allow modifications to the job cluster once a job 
is running. For this we are looking at a "read-only" Flink UI, that does not 
allow users to cancel a job or submit a job.

My question is,
1. Is there such an option when we bring up a Flink cluster currently
2. If no, is this something we can contribute?

I can imagine another solution where the "cancel" and "submit job" options 
mutates the job clusters.

Wanted to check what are the general guidelines on this.

Any pointers would be appreciated

Uday


Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you.
*


--

Notice: This e-mail is intended solely for use of the individual or entity to 
which it is addressed and may contain information that is proprietary, 
privileged and/or exempt from disclosure under applicable law. If the reader is 
not the intended recipient or agent responsible for delivering the message to 
the intended recipient, you are hereby notified that any dissemination, 
distribution or copying of this communication is strictly prohibited. This 
communication may also contain data subject to U.S. export laws. If so, data 
subject to the International Traffic in Arms Regulation cannot be disseminated, 
distributed, transferred, or copied, whether incorporated or in its original 
form, to foreign nationals residing in the U.S. or abroad, absent the express 
prior approval of the U.S. Department of State. Data subject to the Export 
Administration Act may not be disseminated, distributed, transferred or copied 
contrary to U. S. Department of Commerce regulations. If you have received this 
communication in error, please notify the sender by reply e-mail and destroy 
the e-mail message and any physical copies made of the communication.
 Thank you. 
*


Re: No zero ( 2 ) exit code on k8s StandaloneJobClusterEntryPoint when save point with cancel...

2019-04-25 Thread Vishal Santoshi
Here you go https://issues.apache.org/jira/browse/FLINK-12333

Again thanks for the prompt response



On Wed, Apr 24, 2019 at 11:06 AM Till Rohrmann  wrote:

> Good to hear. Could you create a documentation JIRA issue for this
> problem? Thanks a lot.
>
> Cheers,
> Till
>
> On Wed, Apr 24, 2019 at 4:58 PM Vishal Santoshi 
> wrote:
>
>> Verified, I think we just need to make sure that it is documented :)
>>
>> On Wed, Apr 24, 2019 at 9:47 AM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> This makes total sense and actually is smart ( defensive ). Will test
>>> and report. I think though that this needs to be documented :)
>>>
>>> On Wed, Apr 24, 2019 at 6:03 AM Till Rohrmann 
>>> wrote:
>>>
 Hi Vishal,

 it seems that the following is happening: You triggered the cancel with
 savepoint command from via the REST call. This command is an asynchronous
 operation which produces a result (the savepoint path). In order to deliver
 asynchronous results to the caller, Flink waits before shutting down until
 they are delivered or until it times out after 5 minutes. I assume that you
 don't request the savepoint path from Flink via the returned URL from the
 original request. This could either happen if you kill the CLI before its
 done or if you have written your own method to trigger this operation.

 I guess we could add a flag for asynchronous operations which tells
 Flink that their results don't need to get delivered to some client. If you
 would like to have such a feature, then please open a JIRA issue for it.

 Cheers,
 Till

 On Wed, Apr 24, 2019 at 3:49 AM Vishal Santoshi <
 vishal.santo...@gmail.com> wrote:

> Anyione ?
>
>
>
> I think there some race condition .  These are the TM logs.. I am
> puzzled  b'coz in a larger pipe ( there are about 32 lots on 8 replicas 
> and
> it works
>
>
>
>
> 2019-04-24 01:16:20,889 DEBUG
> org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager  -
> Releasing local state under allocation id 
> 5a853ef886e1c599f86b9503306fffd2.
>
> 2019-04-24 01:16:20,894 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor- Close
> JobManager connection for job .
>
> org.apache.flink.util.FlinkException: Stopping JobMaster for job
> EventCountJob().
>
> at
> org.apache.flink.runtime.jobmaster.JobMaster.onStop(JobMaster.java:355)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor$StartedState.terminate(AkkaRpcActor.java:504)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleControlMessage(AkkaRpcActor.java:170)
>
> at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.onReceive(AkkaRpcActor.java:142)
>
> at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.onReceive(FencedAkkaRpcActor.java:40)
>
> at
> akka.actor.UntypedActor$$anonfun$receive$1.applyOrElse(UntypedActor.scala:165)
>
> at akka.actor.Actor$class.aroundReceive(Actor.scala:502)
>
> at akka.actor.UntypedActor.aroundReceive(UntypedActor.scala:95)
>
> at akka.actor.ActorCell.receiveMessage(ActorCell.scala:526)
>
> at akka.actor.ActorCell.invoke(ActorCell.scala:495)
>
> at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:257)
>
> at akka.dispatch.Mailbox.run(Mailbox.scala:224)
>
> at akka.dispatch.Mailbox.exec(Mailbox.scala:234)
>
> at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339)
>
> at
> scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979)
>
> at
> scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107)
>
> 2019-04-24 01:16:20,895 INFO
> org.apache.flink.runtime.taskexecutor.JobLeaderService-
> Cannot reconnect to job  because it is not
> registered.
>
> 2019-04-24 01:16:21,053 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Received heartbeat request from e61c2b7d992f151936e21db1ca0d.
>
> 2019-04-24 01:16:22,136 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
> Got ping response for sessionid: 0x25add5478fb2ec6 after 0ms
>
> 2019-04-24 01:16:31,052 DEBUG
> org.apache.flink.runtime.taskexecutor.TaskExecutor-
> Received heartbeat request from e61c2b7d992f151936e21db1ca0d.
>
> 2019-04-24 01:16:35,483 DEBUG
> org.apache.flink.shaded.zookeeper.org.apache.zookeeper.ClientCnxn  -
> Got ping response for sessionid: 0x25add5478fb2ec6 after 0ms
>
> On Tue, Apr 23, 2019 at 3:11 PM Vishal Santoshi <
> 

Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-04-25 Thread an0
If my understanding is correct, then why `assignTimestampsAndWatermarks` before 
`keyBy` works? The `timeWindowAll` stream's input streams are task 1 and task 
2, with task 2 idling, no matter whether `assignTimestampsAndWatermarks` is 
before or after `keyBy`, because whether task 2 receives elements only depends 
on the key distribution, has nothing to do with timestamp assignment, right?


/key 1 trips\

  /\  
(A) trips--> assignTimestampsAndWatermarks-->keyBy
timeWindowAll

  \   idle/

\key 2 trips/

   /key 1 trips--> assignTimestampsAndWatermarks\
 /  
   \  
(B) trips-->keyBy   
  timeWindowAll
 \   idle   
  /
   \key 2 trips--> assignTimestampsAndWatermarks/

How things are different between A and B from `timeWindowAll`'s perspective?

BTW, thanks for the webinar link, I'll check it later.

On 2019/04/25 08:30:20, Dawid Wysakowicz  wrote: 
> Hi,
> 
> Yes I think your explanation is correct. I can also recommend Seth's
> webinar where he talks about debugging Watermarks[1]
> 
> Best,
> 
> Dawid
> 
> [1]
> https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial
> 
> On 22/04/2019 22:55, an0 wrote:
> > Thanks, I feel I'm getting closer to the truth. 
> >
> > So parallelism is the cause? Say my parallelism is 2. Does that mean I get 
> > 2 tasks running after `keyBy` if even all elements have the same key so go 
> > to 1 down stream(say task 1)? And it is the other task(task 2) with no 
> > incoming data that caused the `timeWindowAll` stream unable to progress? 
> > Because both task 1 and task 2 are its input streams and one is idling so 
> > its event time cannot make progress?
> >
> > On 2019/04/22 01:57:39, Guowei Ma  wrote: 
> >> HI,
> >>
> >> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it
> >> receives an element.
> >>
> >> For after Keyby:
> >> Flink uses the HashCode of key and the parallelism of down stream to decide
> >> which subtask would receive the element. This means if your key is always
> >> same, all the sources will only send the elements to the same down stream
> >> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor.
> >>
> >> For before Keyby:
> >> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would
> >> be chained together, which means every
> >> BoundedOutOfOrdernessTimestampExtractors will receive elements.
> >>
> >> Best,
> >> Guowei
> >>
> >>
> >> an0  于2019年4月19日周五 下午10:41写道:
> >>
> >>> Hi,
> >>>
> >>> First of all, thank you for the `shuffle()` tip. It works. However, I
> >>> still don't understand why it doesn't work without calling `shuffle()`.
> >>>
> >>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips?
> >>> All the trips has keys and timestamps. As I said in my reply to Paul, I 
> >>> see
> >>> the same watermarks being extracted.
> >>>
> >>> How could calling `assignTimestampsAndWatermarks` before VS after `keyBy`
> >>> matter? My understanding is any specific window for a specific key always
> >>> receives the exactly same data, and the calling order of
> >>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that.
> >>>
> >>> To make `keyBy` as irrelevant as possible, I tried letting it always
> >>> return the same key so that there is only 1 keyed stream and it is exactly
> >>> the same as the original unkeyed stream. It still doesn't trigger windows:
> >>> ```java
> >>> DataStream trips = env.addSource(consumer);
> >>> KeyedStream userTrips = trips.keyBy(trip -> 0L);
> >>> DataStream featurizedUserTrips =
> >>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
> >>> BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
> >>> @Override
> >>> public long extractTimestamp(Trip trip) {
> >>> return trip.endTime.getTime();
> >>> }
> >>> });
> >>> AllWindowedStream windowedUserTrips =
> >>> featurizedUserTrips.timeWindowAll(Time.days(7),
> >>> Time.days(1));
> >>> ```
> >>>
> >>> It makes no sense to me. Please help me understand why it doesn't work.
> >>> Thanks!
> >>>
> >>> On 2019/04/19 04:14:31, Guowei Ma  wrote:
>  Hi,
>  After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors
>  could receive the elements(trip). If that is the case
>  

Re: QueryableState startup regression in 1.8.0 ( migration from 1.7.2 )

2019-04-25 Thread Vishal Santoshi
Ditto that, queryable-state.enable to true works.

Thanks everyone.

On Thu, Apr 25, 2019 at 6:28 AM Dawid Wysakowicz 
wrote:

> Hi Vishal,
>
> As Guowei mentioned you have to enable the Queryable state. The default
> setting was changed in 1.8.0. There is an open JIRA[1] for changing the
> documentation accordingly.
>
> Best,
>
> Dawid
>
> [1] https://issues.apache.org/jira/browse/FLINK-12274
> On 25/04/2019 03:27, Guowei Ma wrote:
>
> You could try to set queryable-state.enable to true. And check again.
>
> Vishal Santoshi 于2019年4月25日 周四上午1:40写道:
>
>> Any one ?
>>
>> On Wed, Apr 24, 2019 at 12:02 PM Vishal Santoshi <
>> vishal.santo...@gmail.com> wrote:
>>
>>> Hello folks,
>>>
>>>  Following
>>> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>>>  .
>>> for setting up the Queryable Server and proxy, I have my classpath ( the
>>> lib directory ) that has the  required jar, But I do not see the mentioned
>>> log and of course am not able to set up the QS server/Proxy . This has
>>> worked on 1.7.2 and I think I have everything as advised, see the logs
>>> below. I do not  see this log  "Started the Queryable State Proxy
>>> Server @ ...".  Any one with this issue...
>>>
>>>
>>>
>>> 2019-04-24 15:54:26,296 INFO  
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>>   - -Dtaskmanager.numberOfTaskSlots=1
>>>
>>> 2019-04-24 15:54:26,296 INFO  
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>>   - --configDir
>>>
>>> 2019-04-24 15:54:26,296 INFO  
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>>   - /usr/local/flink/conf
>>>
>>> 2019-04-24 15:54:26,296 INFO  
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>>   -  Classpath:
>>> /usr/local/flink/lib/flink-metrics-prometheus-1.8.0.jar:
>>> */usr/local/flink/lib/flink-queryable-state-runtime_2.11-1.8.0.jar*
>>> :/usr/local/flink/lib/hadoop.jar:/usr/local/flink/lib/jobs.jar:/usr/local/flink/lib/log4j-1.2.17.jar:/usr/local/flink/lib/slf4j-log4j12-1.7.15.jar:/usr/local/flink/lib/flink-dist_2.11-1.8.0.jar:::
>>>
>>> 2019-04-24 15:54:26,296 INFO  
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>>   -
>>> 
>>>
>>> 2019-04-24 15:54:26,298 INFO  
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>>   - Registered UNIX signal handlers for [TERM, HUP, INT]
>>>
>>> 2019-04-24 15:54:26,300 INFO  
>>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner
>>>   - Maximum number of open file descriptors is 65536.
>>>
>>> 2019-04-24 15:54:26,305 INFO
>>> org.apache.flink.configuration.GlobalConfiguration- Loading
>>> configuration property: state.backend.fs.checkpointdir,
>>> hdfs:///flink-checkpoints_k8s_test/prod
>>>
>>> 2
>>>
>>>
>>>
>>>
>>> --
> Best,
> Guowei
>
>


Re: Flink CLI

2019-04-25 Thread Oytun Tez
I had come across flink-deployer actually, but somehow didn't want to
"learn" it... (versus just a bunch of lines in a script)

At some time with more bandwidth, we should migrate to this one and
standardize flink-deployer (and later take this to mainstream Flink :P).

---
Oytun Tez

*M O T A W O R D*
The World's Fastest Human Translation Platform.
oy...@motaword.com — www.motaword.com


On Thu, Apr 25, 2019 at 3:14 AM Marc Rooding  wrote:

> Hi Steven, Oytun
>
> You may find the tool we open-sourced last year useful. It offers
> deploying and updating jobs with savepointing.
>
> You can find it on Github: https://github.com/ing-bank/flink-deployer
>
> There’s also a docker image available in Docker Hub.
>
> Marc
> On 24 Apr 2019, 17:29 +0200, Oytun Tez , wrote:
>
> Hi Steven,
>
> As much as I am aware,
> 1) no update call. our build flow feels a little weird to us as well.
> definitely requires scripting.
> 2) we are using Flink management API remotely in our build flow to 1) get
> jobs, 2) savepoint them, 3) cancel them etc. I am going to release a Python
> script for this soon.
>
> ---
> Oytun Tez
>
> *M O T A W O R D*
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> On Wed, Apr 24, 2019 at 11:06 AM Steven Nelson 
> wrote:
>
>> Hello!
>>
>> I am working on automating our deployments to our Flink cluster. I had a
>> couple questions about the flink cli.
>>
>> 1) I thought there was an "update" command that would internally manage
>> the cancel with savepoint, upload new jar, restart from savepoint process.
>>
>> 2) Is there a way to get the Flink cli to output it's result in a json
>> format? Right now I would need to parse the results of the "flink list"
>> command to get the job id, cancel the job with savepoint, parse the results
>> of that to get the savepoint filename, then restore using that. Parsing the
>> output seems brittle to me.
>>
>> Thought?
>> -Steve
>>
>>


Re: Taskmanager unable to rejoin job manager

2019-04-25 Thread Mar_zieh
Hello 

I want to run flink on apache Mesos with Marathon and I configure Zookeeper
too; so I run "mesos-appmaster.sh"; but it shows me this error: 

2019-04-25 13:53:18,160 INFO 
org.apache.flink.mesos.runtime.clusterframework.MesosResourceManager  -
Mesos resource manager started.
2019-04-25 13:53:23,176 WARN 
org.apache.flink.mesos.scheduler.ConnectionMonitor- Unable to
connect to Mesos; still trying...
2019-04-25 13:53:28,194 WARN 
org.apache.flink.mesos.scheduler.ConnectionMonitor- Unable to
connect to Mesos; still trying...
2019-04-25 13:53:33,214 WARN 
org.apache.flink.mesos.scheduler.ConnectionMonitor- Unable to
connect to Mesos; still trying...
2019-04-25 13:53:38,234 WARN 
org.apache.flink.mesos.scheduler.ConnectionMonitor- Unable to
connect to Mesos; still trying...


Would you please tell me how to solve this error?

Many thanks.



--
Sent from: http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/


read only mode for Flink UI

2019-04-25 Thread uday bhaskar
Hi

We are looking at running Flink on Kubernetes in Job cluster mode. As part
of our plans we do not want to allow modifications to the job cluster once
a job is running. For this we are looking at a "read-only" Flink UI, that
does not allow users to cancel a job or submit a job.

My question is,
1. Is there such an option when we bring up a Flink cluster currently
2. If no, is this something we can contribute?

I can imagine another solution where the "cancel" and "submit job" options
mutates the job clusters.

Wanted to check what are the general guidelines on this.

Any pointers would be appreciated

Uday


Re: Zeppelin

2019-04-25 Thread Jeff Zhang
Thanks Dawid,

Hi Sergey,

I am working on update the flink interpreter of zeppelin to support flink
1.9 (supposed to be released this summer).
For the current flink interpreter of zeppelin 0.9, I haven't verified it
against flink 1.8. could you show the full interpreter log ? And what is
the size your input file ?



Dawid Wysakowicz  于2019年4月25日周四 下午6:31写道:

> Hi Sergey,
>
> I am not very familiar with Zepellin. But I know Jeff (cc'ed) is working
> on integrating Flink with some notebooks. He might be able to help you.
>
> Best,
>
> Dawid
> On 25/04/2019 08:42, Smirnov Sergey Vladimirovich (39833) wrote:
>
> Hello,
>
>
>
> Trying to link Zeppelin 0.9 with Flink 1.8. It`s a small dev cluster
> deployed in standalone manner.
>
> Got the same error as described here
> https://stackoverflow.com/questions/54257671/runnning-a-job-in-apache-flink-standalone-mode-on-zeppelin-i-have-this-error-to
>
> Would appreciate for any support for helping to resolve that problem.
>
>
>
> Regards,
>
> Sergey
>
>
>
>

-- 
Best Regards

Jeff Zhang


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-25 Thread Jeff Zhang
Hi  Beckett,

Thanks for your feedback, See my comments inline

>>>  How do user specify the listener? *
What I proposal is to register JobListener in ExecutionEnvironment. I don't
think we should make ClusterClient as public api.

>>> Where should the listener run? *
I don't think it is proper to run listener in JobMaster. The listener is
user code, and usually it is depends on user's other component. So running
it in client side make more sense to me.

>>> What should be reported to the Listener? *
I am open to add other api in this JobListener. But for now, I am afraid
the ExecutionEnvironment is not aware of failover, so it is not possible to
report failover event.

>>> What can the listeners do on notifications? *
Do you mean to pass JobGraph to these methods ? like following ( I am
afraid JobGraph is not a public and stable api, we should not expose it to
users)

public interface JobListener {

void onJobSubmitted(JobGraph graph, JobID jobId);

void onJobExecuted(JobGraph graph, JobExecutionResult jobResult);

void onJobCanceled(JobGraph graph, JobID jobId, String savepointPath);
}


Becket Qin  于2019年4月25日周四 下午7:40写道:

> Thanks for the proposal, Jeff. Adding a listener to allow users handle
> events during the job lifecycle makes a lot of sense to me.
>
> Here are my two cents.
>
> * How do user specify the listener? *
> It is not quite clear to me whether we consider ClusterClient as a public
> interface? From what I understand ClusterClient is not a public interface
> right now. In contrast, ExecutionEnvironment is the de facto interface for
> administrative work. After job submission, it is essentially bound to a job
> as an administrative handle. Given this current state, personally I feel
> acceptable to have the listener registered to the ExecutionEnvironment.
>
> * Where should the listener run? *
> If the listener runs on the client side, the client have to be always
> connected to the Flink cluster. This does not quite work if the Job is a
> streaming job. Should we provide the option to run the listener in
> JobMaster as well?
>
> * What should be reported to the Listener? *
> Besides the proposed APIs, does it make sense to also report events such
> as failover?
>
> * What can the listeners do on notifications? *
> If the listeners are expected to do anything on the job, should some
> helper class to manipulate the jobs be passed to the listener method?
> Otherwise users may not be able to easily take action.
>
> Thanks,
>
> Jiangjie (Becket) Qin
>
>
>
>
> On Wed, Apr 24, 2019 at 2:43 PM Jeff Zhang  wrote:
>
>> Hi Till,
>>
>> IMHO, allow adding hooks involves 2 steps.
>> 1. Provide hook interface, and call these hook in flink (ClusterClient)
>> at the right place. This should be done by framework (flink)
>> 2. Implement new hook implementation and add/register them into
>> framework(flink)
>>
>> What I am doing is step 1 which should be done by flink, step 2 is done
>> by users. But IIUC, your suggestion of using custom ClusterClient seems
>> mixing these 2 steps together. Say I'd like to add new hooks, I have to
>> implement a new custom ClusterClient, add new hooks and call them in the
>> custom ClusterClient at the right place.
>> This doesn't make sense to me. For a user who want to add hooks, he is
>> not supposed to understand the mechanism of ClusterClient, and should not
>> touch ClusterClient. What do you think ?
>>
>>
>>
>>
>> Till Rohrmann  于2019年4月23日周二 下午4:24写道:
>>
>>> I think we should not expose the ClusterClient configuration via the
>>> ExecutionEnvironment (env.getClusterClient().addJobListener) because this
>>> is effectively the same as exposing the JobListener interface directly on
>>> the ExecutionEnvironment. Instead I think it could be possible to provide a
>>> ClusterClient factory which is picked up from the Configuration or some
>>> other mechanism for example. That way it would not need to be exposed via
>>> the ExecutionEnvironment at all.
>>>
>>> Cheers,
>>> Till
>>>
>>> On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang  wrote:
>>>
 >>>  The ExecutionEnvironment is usually used by the user who writes
 the code and this person (I assume) would not be really interested in these
 callbacks.

 Usually ExecutionEnvironment is used by the user who write the code,
 but it doesn't needs to be created and configured by this person. e.g. in
 Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just
 use ExecutionEnvironment to write flink program.  You are right that the
 end user would not be interested in these callback, but the third party
 library that integrate with zeppelin would be interested in these 
 callbacks.

 >>> In your case, it could be sufficient to offer some hooks for the
 ClusterClient or being able to provide a custom ClusterClient.

 Actually in my initial PR (https://github.com/apache/flink/pull/8190),
 I do pass JobListener to ClusterClient and invoke it there.
 

Re: State Migration with RocksDB MapState

2019-04-25 Thread Cliff Resnick
Great news! Thanks

On Thu, Apr 25, 2019, 2:59 AM Tzu-Li (Gordon) Tai 
wrote:

> Hi Cliff,
>
> Thanks for bringing this up again.
>
> I think it would make sense to at least move this forward be only
> exclusively checking the schema for user keys in MapState, and allow value
> schema evolution.
> I'll comment on the JIRA about this, and also make it a blocker for 1.9.0
> to make sure it will be resolved by then.
>
> Concerning your question on GenericRecord:
> The actual schema resolution is still performed using the Avro schema, so
> you will still bump into the same issue.
>
> Best,
> Gordon
>
> On Wed, Apr 24, 2019 at 7:45 PM Cliff Resnick  wrote:
>
>> Hi Gordon,
>>
>> I noticed there has been no movement on this issue and I'm wondering if I
>> can find some way to work around this.
>> My MapState value is a case class container of Avro-generated
>> SpecificRecords. If one SpecificRecord changes I am stuck.
>>
>> From the issue It seems like the blocker is around evolving the MapState
>> key type.  That may be a nasty problem, but my key type is stable and will
>> never change. What do you think the level of difficulty would be to add
>> support for evolving only the value?
>>
>> Also, if I use GenericRecord instead of SpecificRecord will the need for
>> schema evolution still be triggered? Or does it actually go down to the
>> avro schema rather than just the class serialVersionUID?
>>
>>
>>
>>
>>
>>
>> On Mon, Mar 18, 2019 at 1:10 AM Tzu-Li (Gordon) Tai 
>> wrote:
>>
>>> Hi Cliff,
>>>
>>> Thanks for bringing this up!
>>> AFAIK, this wouldn't be a long-term blocker. I've just opened a JIRA to
>>> track this [1].
>>>
>>> As explained in the JIRA ticket, the main reason this is disallowed in
>>> the initial support for state schema evolution was due to how migration was
>>> implemented in the RocksDB state backend.
>>> Technically speaking, enabling this in the future is definitely possible.
>>>
>>> Cheers,
>>> Gordon
>>>
>>> [1]  https://issues.apache.org/jira/browse/FLINK-11947
>>>
>>> On Mon, Mar 18, 2019 at 11:20 AM Cliff Resnick  wrote:
>>>
 After trying out state migration in 1.8 rc2 I ran into this hard stop
 below. The comment does not give an indication why rocksdb map state cannot
 be migrated, and I'm wondering what the status is, since we do need this
 functionality and would like to know if this is a long-term blocker or not.
 Anyone know?


 https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542

>>>


Re: [Discuss] Add JobListener (hook) in flink job lifecycle

2019-04-25 Thread Becket Qin
Thanks for the proposal, Jeff. Adding a listener to allow users handle
events during the job lifecycle makes a lot of sense to me.

Here are my two cents.

* How do user specify the listener? *
It is not quite clear to me whether we consider ClusterClient as a public
interface? From what I understand ClusterClient is not a public interface
right now. In contrast, ExecutionEnvironment is the de facto interface for
administrative work. After job submission, it is essentially bound to a job
as an administrative handle. Given this current state, personally I feel
acceptable to have the listener registered to the ExecutionEnvironment.

* Where should the listener run? *
If the listener runs on the client side, the client have to be always
connected to the Flink cluster. This does not quite work if the Job is a
streaming job. Should we provide the option to run the listener in
JobMaster as well?

* What should be reported to the Listener? *
Besides the proposed APIs, does it make sense to also report events such as
failover?

* What can the listeners do on notifications? *
If the listeners are expected to do anything on the job, should some helper
class to manipulate the jobs be passed to the listener method? Otherwise
users may not be able to easily take action.

Thanks,

Jiangjie (Becket) Qin




On Wed, Apr 24, 2019 at 2:43 PM Jeff Zhang  wrote:

> Hi Till,
>
> IMHO, allow adding hooks involves 2 steps.
> 1. Provide hook interface, and call these hook in flink (ClusterClient) at
> the right place. This should be done by framework (flink)
> 2. Implement new hook implementation and add/register them into
> framework(flink)
>
> What I am doing is step 1 which should be done by flink, step 2 is done by
> users. But IIUC, your suggestion of using custom ClusterClient seems mixing
> these 2 steps together. Say I'd like to add new hooks, I have to implement
> a new custom ClusterClient, add new hooks and call them in the custom
> ClusterClient at the right place.
> This doesn't make sense to me. For a user who want to add hooks, he is not
> supposed to understand the mechanism of ClusterClient, and should not touch
> ClusterClient. What do you think ?
>
>
>
>
> Till Rohrmann  于2019年4月23日周二 下午4:24写道:
>
>> I think we should not expose the ClusterClient configuration via the
>> ExecutionEnvironment (env.getClusterClient().addJobListener) because this
>> is effectively the same as exposing the JobListener interface directly on
>> the ExecutionEnvironment. Instead I think it could be possible to provide a
>> ClusterClient factory which is picked up from the Configuration or some
>> other mechanism for example. That way it would not need to be exposed via
>> the ExecutionEnvironment at all.
>>
>> Cheers,
>> Till
>>
>> On Fri, Apr 19, 2019 at 11:12 AM Jeff Zhang  wrote:
>>
>>> >>>  The ExecutionEnvironment is usually used by the user who writes the
>>> code and this person (I assume) would not be really interested in these
>>> callbacks.
>>>
>>> Usually ExecutionEnvironment is used by the user who write the code, but
>>> it doesn't needs to be created and configured by this person. e.g. in
>>> Zeppelin notebook, ExecutionEnvironment is created by Zeppelin, user just
>>> use ExecutionEnvironment to write flink program.  You are right that the
>>> end user would not be interested in these callback, but the third party
>>> library that integrate with zeppelin would be interested in these callbacks.
>>>
>>> >>> In your case, it could be sufficient to offer some hooks for the
>>> ClusterClient or being able to provide a custom ClusterClient.
>>>
>>> Actually in my initial PR (https://github.com/apache/flink/pull/8190),
>>> I do pass JobListener to ClusterClient and invoke it there.
>>> But IMHO, ClusterClient is not supposed be a public api for users.
>>> Instead JobClient is the public api that user should use to control job. So
>>> adding hooks to ClusterClient directly and provide a custom ClusterClient
>>> doesn't make sense to me. IIUC, you are suggesting the following approach
>>>  env.getClusterClient().addJobListener(jobListener)
>>> but I don't see its benefit compared to this.
>>>  env.addJobListener(jobListener)
>>>
>>> Overall, I think adding hooks is orthogonal with fine grained job
>>> control. And I agree that we should refactor the flink client component,
>>> but I don't think it would affect the JobListener interface. What do you
>>> think ?
>>>
>>>
>>>
>>>
>>> Till Rohrmann  于2019年4月18日周四 下午8:57写道:
>>>
 Thanks for starting this discussion Jeff. I can see the need for
 additional hooks for third party integrations.

 The thing I'm wondering is whether we really need/want to expose a
 JobListener via the ExecutionEnvironment. The ExecutionEnvironment is
 usually used by the user who writes the code and this person (I assume)
 would not be really interested in these callbacks. If he would, then one
 should rather think about a better programmatic job control where the
 

Re: Job Startup Arguments

2019-04-25 Thread Chesnay Schepler
The passed job arguments can not be queried via the REST API. When 
submitting jobs through the CLI these parameters never arrive at the 
cluster; in case of REST API submission they are immediately discarded 
after the submission has finished.


On 25/04/2019 12:25, Dawid Wysakowicz wrote:

Hi Steve,

As far as I know, this information is not available in REST API, but it
would be good to double check with Chesnay(cc'ed). You can see the
complete list of available REST commands here[1].

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#api

On 24/04/2019 21:10, Steven Nelson wrote:

Hello!

Is there a way (via the REST API) to see the parameters used to start
a job?

-Steve





Re: kafka corrupt record exception

2019-04-25 Thread Dominik Wosiński
Hey,
Sorry for such a delay, but I have missed this message. Basically,
technically you could have Kafka broker installed in version say 1.0.0  and
using  FlinkKafkaConsumer08. This could technically create issues.
I'm not sure if You can automate the process of skipping corrupted
messages, as You would have to write the consumer that will allow skipping
messages that are corrupted. This maybe a good idea to think about for
Flink though.

On the other hand, if You have many messages that are corrupted, this may
mean that the problem lies elsewhere within You pipeline (kafka producers
before Flink).


Re: TM occasionally hang in deploying state in Flink 1.5

2019-04-25 Thread Dawid Wysakowicz
Hi,

Feel free to open a JIRA for this issue. By the way have you
investigated what is the root cause for it hanging?

Best,

Dawid

On 25/04/2019 08:55, qi luo wrote:
> Hello,
>
> This issue occurred again and we dumped the TM thread. It indeed hung
> on socket read to download jar from Blob server:
> /
> /
> /"DataSource (at createInput(ExecutionEnvironment.java:548)
> (our.code)) (1999/2000)" #72 prio=5 os_prio=0 tid=0x7fb9a1521000
> nid=0xa0994 runnable [0x7fb97cfbf000]/
> /   java.lang.Thread.State: RUNNABLE/
> /        at java.net.SocketInputStream.socketRead0(Native Method)/
> /        at
> java.net.SocketInputStream.socketRead(SocketInputStream.java:116)/
> /        at java.net.SocketInputStream.read(SocketInputStream.java:171)/
> /        at java.net.SocketInputStream.read(SocketInputStream.java:141)/
> /        at
> org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:152)/
> /        at
> org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:140)/
> /        at
> org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:170)/
> /        at
> org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)/
> /        at
> org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206)/
> /        at
> org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)/
> /        - locked <0x00078ab60ba8> (a java.lang.Object)/
> /        at
> org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:893)/
> /        at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)/
> /        at java.lang.Thread.run(Thread.java:748)/
>
> I checked the latest master code. There’s still no socket timeout in
> Blob client. Should I create an issue to add this timeout?
>
> Regards,
> Qi 
>
>> On Apr 19, 2019, at 7:49 PM, qi luo > > wrote:
>>
>> Hi all,
>>
>> We use Flink 1.5 batch and start thousands of jobs per day.
>> Occasionally we observed some stuck jobs, due to some TM hang in
>> “DEPLOYING” state. 
>>
>> On checking TM log, it shows that it stuck in downloading jars in
>> BlobClient:
>>
>> 
>> ...
>> INFO org.apache.flink.runtime.taskexecutor.TaskExecutor - Received
>> task DataSource (at createInput(ExecutionEnvironment.java:548)
>> (our.code)) (184/2000). INFO
>> org.apache.flink.runtime.taskmanager.Task - DataSource (at
>> createInput(ExecutionEnvironment.java:548) (our.code)) (184/2000)
>> switched from CREATED to DEPLOYING. INFO
>> org.apache.flink.runtime.taskmanager.Task - Creating FileSystem
>> stream leak safety net for task DataSource (at
>> createInput(ExecutionEnvironment.java:548) (our.code)) (184/2000)
>> [DEPLOYING] INFO org.apache.flink.runtime.taskmanager.Task - Loading
>> JAR files for task DataSource (at
>> createInput(ExecutionEnvironment.java:548) (our.code)) (184/2000)
>> [DEPLOYING]. INFO org.apache.flink.runtime.blob.BlobClient -
>> Downloading
>> 19e65c0caa41f264f9ffe4ca2a48a434/p-3ecd6341bf97d5512b14c93f6c9f51f682b6db26-37d5e69d156ee00a924c1ebff0c0d280
>> from some-host-ip-port
>> no more logs...
>> 
>>
>> It seems that the TM is calling BlobClient to download jars from
>> JM/BlobServer. Under hood it’s calling Socket.connect() and then
>> Socket.read() to retrieve results. 
>>
>> Should we add timeout in socket operations in BlobClient to resolve
>> this issue?
>>
>> Thanks,
>> Qi
>


signature.asc
Description: OpenPGP digital signature


Re: Flink Control Stream

2019-04-25 Thread Dominik Wosiński
Thanks for help Till,

I thought so, but I wanted to be sure.
Best Regards,
Dom.


Re: Zeppelin

2019-04-25 Thread Dawid Wysakowicz
Hi Sergey,

I am not very familiar with Zepellin. But I know Jeff (cc'ed) is working
on integrating Flink with some notebooks. He might be able to help you.

Best,

Dawid

On 25/04/2019 08:42, Smirnov Sergey Vladimirovich (39833) wrote:
>
> Hello,
>
>  
>
> Trying to link Zeppelin 0.9 with Flink 1.8. It`s a small dev cluster
> deployed in standalone manner.
>
> Got the same error as described here
> https://stackoverflow.com/questions/54257671/runnning-a-job-in-apache-flink-standalone-mode-on-zeppelin-i-have-this-error-to
>
> Would appreciate for any support for helping to resolve that problem.
>
>  
>
> Regards,
>
> Sergey
>
>  
>


signature.asc
Description: OpenPGP digital signature


Re: QueryableState startup regression in 1.8.0 ( migration from 1.7.2 )

2019-04-25 Thread Dawid Wysakowicz
Hi Vishal,

As Guowei mentioned you have to enable the Queryable state. The default
setting was changed in 1.8.0. There is an open JIRA[1] for changing the
documentation accordingly.

Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-12274

On 25/04/2019 03:27, Guowei Ma wrote:
> You could try to set queryable-state.enable to true. And check again. 
>
> Vishal Santoshi  >于2019年4月25日 周四上午1:40写道:
>
> Any one ?
>
> On Wed, Apr 24, 2019 at 12:02 PM Vishal Santoshi
> mailto:vishal.santo...@gmail.com>> wrote:
>
> Hello folks, 
>
>        
>  Following 
> https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/queryable_state.html#querying-state
>  . 
> for setting up the Queryable Server and proxy, I have my
> classpath ( the lib directory ) that has the  required jar,
> But I do not see the mentioned log and of course am not able
> to set up the QS server/Proxy . This has worked on 1.7.2 and I
> think I have everything as advised, see the logs below. I do
> not  see this log  |"Started the Queryable State Proxy Server
> @ ..."|.  Any one with this issue...
>
>
>
> 2019-04-24 15:54:26,296 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner      
> -     -Dtaskmanager.numberOfTaskSlots=1
>
> 2019-04-24 15:54:26,296 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner      
> -     --configDir
>
> 2019-04-24 15:54:26,296 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner      
> -     /usr/local/flink/conf
>
> 2019-04-24 15:54:26,296 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner      
> -  Classpath:
> 
> /usr/local/flink/lib/flink-metrics-prometheus-1.8.0.jar:*/usr/local/flink/lib/flink-queryable-state-runtime_2.11-1.8.0.jar*:/usr/local/flink/lib/hadoop.jar:/usr/local/flink/lib/jobs.jar:/usr/local/flink/lib/log4j-1.2.17.jar:/usr/local/flink/lib/slf4j-log4j12-1.7.15.jar:/usr/local/flink/lib/flink-dist_2.11-1.8.0.jar:::
>
> 2019-04-24 15:54:26,296 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner      
> -
> 
> 
>
> 2019-04-24 15:54:26,298 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner      
> - Registered UNIX signal handlers for [TERM, HUP, INT]
>
> 2019-04-24 15:54:26,300 INFO 
> org.apache.flink.runtime.taskexecutor.TaskManagerRunner      
> - Maximum number of open file descriptors is 65536.
>
> 2019-04-24 15:54:26,305 INFO 
> org.apache.flink.configuration.GlobalConfiguration           
> - Loading configuration property:
> state.backend.fs.checkpointdir,
> hdfs:///flink-checkpoints_k8s_test/prod
>
> 2
>
>
>
>
>
> -- 
> Best,
> Guowei


signature.asc
Description: OpenPGP digital signature


Re: Job Startup Arguments

2019-04-25 Thread Dawid Wysakowicz
Hi Steve,

As far as I know, this information is not available in REST API, but it
would be good to double check with Chesnay(cc'ed). You can see the
complete list of available REST commands here[1].

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/rest_api.html#api

On 24/04/2019 21:10, Steven Nelson wrote:
> Hello!
>
> Is there a way (via the REST API) to see the parameters used to start
> a job?
>
> -Steve



signature.asc
Description: OpenPGP digital signature


Re: Getting JobExecutionException: Could not set up JobManager when trying to upload new version

2019-04-25 Thread Dawid Wysakowicz
Hi Avi,

Just as some additional explanation. UID of operator is the way we map
state to corresponding operator. This allows loading savepoint with
changed DAG as long as the UIDs stay the same. This as you said explain
why you got the exception when you changed uid of some of the operators.

Best,

Dawid

On 24/04/2019 07:06, Avi Levi wrote:
> Might be useful for someone, Regarding this issue. it seems that
> changing the uid of the operator made this mess .
>
> On Tue, Apr 16, 2019 at 6:31 PM Avi Levi  > wrote:
>
> I am trying to upload a new version of the code but I am getting
> the exception below. The schema of the state was not changed for a
> while . what can be the reason for that (also attached the log
> file) ?
>
>
> 2019-04-16 15:14:11.112
> [flink-akka.actor.default-dispatcher-1138] ERROR
> org.apache.flink.runtime.dispatcher.StandaloneDispatcher  -
> Failed to submit job 693a02204ef5816f91ea3b135f544a7f.
> java.lang.RuntimeException:
> org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
> at
> 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:36)
> at
> 
> java.util.concurrent.CompletableFuture$AsyncSupply.run(CompletableFuture.java:1590)
> at akka.dispatch.TaskInvocation.run(AbstractDispatcher.scala:39)
> at
> 
> akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:415)
> at java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:289)
> at
> 
> java.util.concurrent.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1056)
> at
> java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1692)
> at
> 
> java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:157)
> Caused by:
> org.apache.flink.runtime.client.JobExecutionException: Could
> not set up JobManager
> at
> 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:176)
> at
> 
> org.apache.flink.runtime.dispatcher.Dispatcher$DefaultJobManagerRunnerFactory.createJobManagerRunner(Dispatcher.java:1058)
> at
> 
> org.apache.flink.runtime.dispatcher.Dispatcher.lambda$createJobManagerRunner$5(Dispatcher.java:308)
> at
> 
> org.apache.flink.util.function.CheckedSupplier.lambda$unchecked$0(CheckedSupplier.java:34)
> ... 7 common frames omitted
> Caused by: java.lang.IllegalStateException: Failed to rollback
> to checkpoint/savepoint
> gs://bv-flink-state/dev/state/savepoint-7cbaf2-48f14797.
> Cannot map checkpoint/savepoint state for operator
> 3cfeb06db0484d5556a7de8db2025f09 to the new program, because
> the operator is not available in the new program. If you want
> to allow to skip this, you can set the --allowNonRestoredState
> option on the CLI.
> at
> 
> org.apache.flink.runtime.checkpoint.Checkpoints.loadAndValidateCheckpoint(Checkpoints.java:205)
> at
> 
> org.apache.flink.runtime.checkpoint.CheckpointCoordinator.restoreSavepoint(CheckpointCoordinator.java:1103)
> at
> 
> org.apache.flink.runtime.jobmaster.JobMaster.tryRestoreExecutionGraphFromSavepoint(JobMaster.java:1241)
> at
> 
> org.apache.flink.runtime.jobmaster.JobMaster.createAndRestoreExecutionGraph(JobMaster.java:1165)
> at
> 
> org.apache.flink.runtime.jobmaster.JobMaster.(JobMaster.java:296)
> at
> 
> org.apache.flink.runtime.jobmaster.JobManagerRunner.(JobManagerRunner.java:157)
> ... 10 common frames omitted
> 2019-04-16 15:14:11.242
> [flink-akka.actor.default-dispatcher-1138] ERROR
> org.apache.flink.runtime.webmonitor.handlers.JarRunHandler  -
> Exception occurred in REST handler:
> org.apache.flink.runtime.client.JobSubmissionException: Failed
> to submit job.
> 2019-04-16 15:14:11.947
> [flink-akka.actor.default-dispatcher-1155] ERROR
> org.apache.flink.runtime.rest.handler.job.JobDetailsHandler  -
> Exception occurred in REST handler: Job
> e43ed04cdedd73f9bb9836e87142afbf not found
>
>
> Thanks for your help.
>
> Cheers
> Avi
>


signature.asc
Description: OpenPGP digital signature


Re: Watermark for each key?

2019-04-25 Thread Till Rohrmann
Your proposal could probably also be implemented by using Flink's support
for allowed lateness when defining a window [1]. It has basically the same
idea that there might be some elements which violate the watermark
semantics and which need to be handled separately.

[1]
https://ci.apache.org/projects/flink/flink-docs-stable/dev/event_time.html#late-elements

Cheers,
Till

On Thu, Apr 25, 2019 at 10:21 AM Congxian Qiu 
wrote:

> There was someone working in IoT asking me whether Flink supports per-key
> watermark also.
>
> I’m not sure if we can do the statistics by using raw state manipulating.
> We create a single state for every single key, and when receiving a key, we
> extract the timestamp and to see if we need to send some result to the
> downside(like the trigger action in window), and we can also have tolerant
> the come delay data.
>
>
> Best, Congxian
> On Apr 25, 2019, 01:58 +0800, Lasse Nedergaard ,
> wrote:
>
> Thanks Till
>
> What about this workaround.
> If I after the watermark assignment split the stream in elements that fits
> in the watermark (s1) and those that don’t (s2). The s1 I process with the
> table api with a window aggregate using watermark and s2 I handle with an
> unbounded non-windows aggregate with IdleStateRentionTime so state are
> removed when my devices are up to date again. I then merge the two outputs
> and continue.
> By doing this I handle 99% as standard and only keeping state for the late
> data.
>
> Make sense? And would it work?
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 24. apr. 2019 kl. 19.00 skrev Till Rohrmann :
>
> Hi Lasse,
>
> at the moment this is not supported out of the box by Flink. The community
> thought about this feature but so far did not implement it. Unfortunately,
> I'm also not aware of an easy workaround one could do in the user code
> space.
>
> Cheers,
> Till
>
> On Wed, Apr 24, 2019 at 3:26 PM Lasse Nedergaard <
> lassenederga...@gmail.com> wrote:
>
>> Hi.
>>
>> We work with IoT data and we have cases where the IoT-device delay data
>> transfer if it can't get network access. We would like to use table windows
>> aggregate function over each device to calculate some statistics, but for
>> windows aggregate functions to work we need to assign a watermark. This
>> watermark is general for all devices. We can set allow latency, but we
>> can't set it to months.
>> So what we need is to have a watermark for each device (key by) so the
>> window aggregate work on the timestamp delivered for the device and not the
>> global watermark.
>> Is that possible, or have anyone consider this feature?
>>
>> Best
>>
>> Lasse Nedergaard
>>
>>
>


Re: assignTimestampsAndWatermarks not work after KeyedStream.process

2019-04-25 Thread Dawid Wysakowicz
Hi,

Yes I think your explanation is correct. I can also recommend Seth's
webinar where he talks about debugging Watermarks[1]

Best,

Dawid

[1]
https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial

On 22/04/2019 22:55, an0 wrote:
> Thanks, I feel I'm getting closer to the truth. 
>
> So parallelism is the cause? Say my parallelism is 2. Does that mean I get 2 
> tasks running after `keyBy` if even all elements have the same key so go to 1 
> down stream(say task 1)? And it is the other task(task 2) with no incoming 
> data that caused the `timeWindowAll` stream unable to progress? Because both 
> task 1 and task 2 are its input streams and one is idling so its event time 
> cannot make progress?
>
> On 2019/04/22 01:57:39, Guowei Ma  wrote: 
>> HI,
>>
>> BoundedOutOfOrdernessTimestampExtractors can send a WM at least after it
>> receives an element.
>>
>> For after Keyby:
>> Flink uses the HashCode of key and the parallelism of down stream to decide
>> which subtask would receive the element. This means if your key is always
>> same, all the sources will only send the elements to the same down stream
>> task, for example only no. 3 BoundedOutOfOrdernessTimestampExtractor.
>>
>> For before Keyby:
>> In your case, the Source and BoundedOutOfOrdernessTimestampExtractors would
>> be chained together, which means every
>> BoundedOutOfOrdernessTimestampExtractors will receive elements.
>>
>> Best,
>> Guowei
>>
>>
>> an0  于2019年4月19日周五 下午10:41写道:
>>
>>> Hi,
>>>
>>> First of all, thank you for the `shuffle()` tip. It works. However, I
>>> still don't understand why it doesn't work without calling `shuffle()`.
>>>
>>> Why would not all BoundedOutOfOrdernessTimestampExtractors receive trips?
>>> All the trips has keys and timestamps. As I said in my reply to Paul, I see
>>> the same watermarks being extracted.
>>>
>>> How could calling `assignTimestampsAndWatermarks` before VS after `keyBy`
>>> matter? My understanding is any specific window for a specific key always
>>> receives the exactly same data, and the calling order of
>>> `assignTimestampsAndWatermarks` and `keyBy` shouldn't affect that.
>>>
>>> To make `keyBy` as irrelevant as possible, I tried letting it always
>>> return the same key so that there is only 1 keyed stream and it is exactly
>>> the same as the original unkeyed stream. It still doesn't trigger windows:
>>> ```java
>>> DataStream trips = env.addSource(consumer);
>>> KeyedStream userTrips = trips.keyBy(trip -> 0L);
>>> DataStream featurizedUserTrips =
>>> userTrips.map(trip -> trip).assignTimestampsAndWatermarks(new
>>> BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
>>> @Override
>>> public long extractTimestamp(Trip trip) {
>>> return trip.endTime.getTime();
>>> }
>>> });
>>> AllWindowedStream windowedUserTrips =
>>> featurizedUserTrips.timeWindowAll(Time.days(7),
>>> Time.days(1));
>>> ```
>>>
>>> It makes no sense to me. Please help me understand why it doesn't work.
>>> Thanks!
>>>
>>> On 2019/04/19 04:14:31, Guowei Ma  wrote:
 Hi,
 After keyby maybe only some of BoundedOutOfOrdernessTimestampExtractors
 could receive the elements(trip). If that is the case
 BoundedOutOfOrdernessTimestampExtractor, which does not receive element
 would not send the WM. Since that the timeWindowAll operator could not be
 triggered.
 You could add a shuffle() before the assignTimestampsAndWatermarks in
>>> your
 second case and check if the window is triggered.  If it could be
>>> triggered
 you could check the distribution of elements generated by the source.

 Best,
 Guowei


 an0...@gmail.com  于2019年4月19日周五 上午4:10写道:

> I don't think it is the watermark. I see the same watermarks from the
>>> two
> versions of code.
>
> The processing on the keyed stream doesn't change event time at all. I
>>> can
> simply change my code to use `map` on the keyed stream to return back
>>> the
> input data, so that the window operator receives the exactly same
>>> data. The
> only difference is when I do `assignTimestampsAndWatermarks`. The
>>> result is
> the same, `assignTimestampsAndWatermarks` before `keyBy` works:
> ```java
> DataStream trips =
> env.addSource(consumer).assignTimestampsAndWatermarks(new
> BoundedOutOfOrdernessTimestampExtractor(Time.days(1)) {
> @Override
> public long extractTimestamp(Trip trip) {
> return trip.endTime.getTime();
> }
> });
> KeyedStream userTrips = trips.keyBy(trip -> trip.userId);
> DataStream featurizedUserTrips = userTrips.map(trip -> trip);
> AllWindowedStream windowedUserTrips =
> featurizedUserTrips.timeWindowAll(Time.days(7),
> Time.days(1));
> ```
>
> `assignTimestampsAndWatermarks` after `keyBy` doesn't work:
> ```java
> DataStream trips = env.addSource(consumer);
> KeyedStream userTrips = 

Re: Watermark for each key?

2019-04-25 Thread Congxian Qiu
There was someone working in IoT asking me whether Flink supports per-key 
watermark also.

I’m not sure if we can do the statistics by using raw state manipulating. We 
create a single state for every single key, and when receiving a key, we 
extract the timestamp and to see if we need to send some result to the 
downside(like the trigger action in window), and we can also have tolerant the 
come delay data.


Best, Congxian
On Apr 25, 2019, 01:58 +0800, Lasse Nedergaard , 
wrote:
> Thanks Till
>
> What about this workaround.
> If I after the watermark assignment split the stream in elements that fits in 
> the watermark (s1) and those that don’t (s2). The s1 I process with the table 
> api with a window aggregate using watermark and s2 I handle with an unbounded 
> non-windows aggregate with IdleStateRentionTime so state are removed when my 
> devices are up to date again. I then merge the two outputs and continue.
> By doing this I handle 99% as standard and only keeping state for the late 
> data.
>
> Make sense? And would it work?
>
> Med venlig hilsen / Best regards
> Lasse Nedergaard
>
>
> Den 24. apr. 2019 kl. 19.00 skrev Till Rohrmann :
>
> > Hi Lasse,
> >
> > at the moment this is not supported out of the box by Flink. The community 
> > thought about this feature but so far did not implement it. Unfortunately, 
> > I'm also not aware of an easy workaround one could do in the user code 
> > space.
> >
> > Cheers,
> > Till
> >
> > > On Wed, Apr 24, 2019 at 3:26 PM Lasse Nedergaard 
> > >  wrote:
> > > > Hi.
> > > >
> > > > We work with IoT data and we have cases where the IoT-device delay data 
> > > > transfer if it can't get network access. We would like to use table 
> > > > windows aggregate function over each device to calculate some 
> > > > statistics, but for windows aggregate functions to work we need to 
> > > > assign a watermark. This watermark is general for all devices. We can 
> > > > set allow latency, but we can't set it to months.
> > > > So what we need is to have a watermark for each device (key by) so the 
> > > > window aggregate work on the timestamp delivered for the device and not 
> > > > the global watermark.
> > > > Is that possible, or have anyone consider this feature?
> > > >
> > > > Best
> > > >
> > > > Lasse Nedergaard
> > > >


Re: Constant backpressure on flink job

2019-04-25 Thread Dawid Wysakowicz
Hi Monika,

I would start with identifying the operator that causes backpressure.
More information how to monitor backpressure you can find here in the
docs[1]. You might also be interested in Seth's (cc'ed) webinar[2],
where he also talks how to debug backpressure.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/monitoring/back_pressure.html#monitoring-back-pressure

[2]
https://www.ververica.com/resources/webinar/webinar/debugging-flink-tutorial

On 22/04/2019 17:44, Monika Hristova wrote:
> Hello,
>  
> We are experiencing regular backpressure (at least once a week). I
> would like to ask how we can fix it.
>  
> Currently our configurations are:
> *vi /usr/lib/flink/conf/flink-conf.yaml*
> # Settings applied by Cloud Dataproc initialization action
> jobmanager.rpc.address: bonusengine-prod-m-0
> jobmanager.heap.mb: 4096
> jobmanager.rpc.port: 6123
> taskmanager.heap.mb: 4096
> taskmanager.memory.preallocate: false
> taskmanager.numberOfTaskSlots: 8
> #taskmanager.network.numberOfBuffers: 21952 # legacy deprecated
> taskmanager.network.memory.fraction: 0.9
> taskmanager.network.memory.min: 67108864
> taskmanager.network.memory.max: 1073741824
> taskmanager.memory.segment-size: 65536
> parallelism.default: 52
> web.port: 8081
> web.timeout: 12
> heartbeat.interval: 1
> heartbeat.timeout: 10
> yarn.application-attempts: 10
> high-availability: zookeeper
> high-availability.zookeeper.quorum:
> bonusengine-prod-m-0:2181,bonusengine-prod-m-1:2181,bonusengine-prod-m-2:2181
> high-availability.zookeeper.path.root: /flink
> #high-availability.zookeeper.storageDir: hdfs:///flink/recovery #
> legacy deprecated
> high-availability.storageDir: hdfs:///flink/recovery
> flink.partition-discovery.interval-millis=6
> fs.hdfs.hadoopconf: /etc/hadoop/conf
> state.backend: rocksdb
> state.checkpoints.dir: hdfs:///bonusengine/checkpoints/
> state.savepoints.dir: hdfs:///bonusengine/savepoints
> metrics.reporters: stsd
> metrics.reporter.stsd.class:
> org.apache.flink.metrics.statsd.StatsDReporter
> metrics.reporter.stsd.host: 127.0.0.1
> metrics.reporter.stsd.port: 8125
> zookeeper.sasl.disable: true
> yarn.reallocate-failed: true
> yarn.maximum-failed-containers: 32
> web.backpressure.refresh-interval: 6
> web.backpressure.num-samples: 100
> web.backpressure.delay-between-samples: 50
>  
> with Hadoop HA cluster: masters -> 8 vCPUs, 7.2 GB and slaves -> 16
> vCPUs, 60 GB with yarn configuration(*see attached file*)
>  
> We have one yarn session started where 8 jobs are run. Three of them
> are consuming the same source (kafka) which is causing the
> backpressure, but only one of them experiences backpressure. The state
> of the job is 20 something MB and the checkpoint is configured as follows:
> *checkpointing.interval**=*30 # makes sure value in  ms of
> progress happens between checkpoints
> *checkpointing.pause_between_checkpointing**=*24 # checkpoints
> have to complete within value in ms or are discarded
> *checkpointing.timeout**=*6 # allows given number of checkpoints
> to be in progress at the same time
> *checkpointing.max_concurrent_checkpoints**=*1 # enables/disables
> externalized checkpoints which are retained after job cancellation
> *checkpointing.externalized_checkpoints.enabled**=*true
>  
> as checkpointing pause was increased and timeout was reduced on
> multiple occasions as the job kept failing unable to recover from
> backpressure. RocksDB is configured state backend. The problem keeps
> reproducing even with one minute timeout. Also I would like to point
> out that the perfect checkpointing for that job would be 2 min.
> I would like to ask what might be the problem or at least how to trace
> it ?
>  
> Best Regards,
> Monika Hristova
>
> Get Outlook for Android 
>


signature.asc
Description: OpenPGP digital signature


Re: Create Custom Sink for DataSet

2019-04-25 Thread Dawid Wysakowicz
Hi Soheil,

The equivalent of DataStream's SinkFunction in DataSet API is the
mentioned OutputFormat. You can implement the OutputFormat.

Best,

Dawid

On 21/04/2019 20:01, Soheil Pourbafrani wrote:
> Hi, Using the DataStream API I could create a Custom Sink
> like classRichMySqlSink extends RichSinkFunction and define my
> desire behavior in inserting data into mysql table. But using the
> DataSet API I just can find the output method for sinking data and it
> accept just OutputFormat data type. In addition I don't want to use
> the Built-in Flink JDBCOutputFormat. 
>
> So is there any way to create my own Sink Class for inserting data
> into mysql?


signature.asc
Description: OpenPGP digital signature


Re: Apache Flink - CEP vs SQL detecting patterns

2019-04-25 Thread Dawid Wysakowicz
Hi,

Unfortunately those are just ignored. The timed out partial matches are
not emitted.

Best,

Dawid

On 20/04/2019 19:49, M Singh wrote:
> Dawid:
>
> So, what happens when there is a timeout - is there any value/field in
> the resulting data stream that indicates that this was a timeout ?
>
> Thanks
>
> On Tuesday, April 16, 2019, 10:12:58 AM EDT, Dawid Wysakowicz
>  wrote:
>
>
> Hi Mans,
>
> Yes you are right. That feature is not available in SQL, as there is
> no such feature in SQL standard. The only addition to SQL standard we
> introduce so far is the WITHIN clause. We might introduce the timed
> out patterns some time in the future, but personally I am not aware of
> such plans.
>
> Best,
>
> Dawid
>
> On 12/04/2019 22:40, M Singh wrote:
> Hi:
>
> I am looking at the documentation of the CEP and there is way to
> access patterns which have timeout.  But I could  not find similar
> capability in the Table and SQL interface detecting patterns.  I am
> assuming that the CEP interface is more comprehensive and complete
> than the SQL/Table interface.
>
> Please let me know if I have missed anything.
>
> Thanks
>
> Mans


signature.asc
Description: OpenPGP digital signature


Re: RichAsyncFunction Timer Service

2019-04-25 Thread Dawid Wysakowicz
Hi Mike,

I think the reason why there is no access to TimerService in async
function is that as it is an async function, there are no guarantees
when/and where(at which stage of the pipeline) the function will
actually be executed. This characteristic doesn't align with
TimerService and timely callbacks.

Best,

Dawid

On 19/04/2019 17:41, Mikhail Pryakhin wrote:
> Hello, Flink community!
>
> It happens that I need to access a timer service in a
> RichAsyncFunction implementation. I know it's normally accomplished
> via StreamingRuntimeContext instance available in a RichFunction, but
> unfortunately, RichAsyncFunction extending RichFunction overrides
> “setRuntimeContext” method [1] wrapping a RuntimeContext instance
> passed as the method argument into a RichAsyncFunctionRuntimeContext
> instance [2]. This RichAsyncFunction specific RuntimeContext
> implementation is private [2] which makes it infeasible to gain access
> to a wrapped original RuntimeContext thus making it impossible to
> leverage timer service in RichAsyncFunction implementations. Just
> curious is there any reason for that? Can we make this implementation
> public or somehow share a wrapped instance?
>
> Many thanks in advance!
>
> [1]
> https://github.com/apache/flink/blob/c96a4d7afe379a291cc538ca36af896df8dc2127/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java#L76
> [2]
> https://github.com/apache/flink/blob/c96a4d7afe379a291cc538ca36af896df8dc2127/flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/async/RichAsyncFunction.java#L100
>
>
>
> Kind Regards,
> Mike Pryakhin
>


signature.asc
Description: OpenPGP digital signature


Re: kafka partitions, data locality

2019-04-25 Thread Dawid Wysakowicz
Hi Smirnov,

Actually there is a way to tell Flink that data is already partitioned.
You can try the reinterpretAsKeyedStream[1] method. I must warn you
though this is an experimental feature.

Best,

Dawid

[1]
https://ci.apache.org/projects/flink/flink-docs-release-1.8/dev/stream/experimental.html#experimental-features

On 19/04/2019 11:48, Smirnov Sergey Vladimirovich (39833) wrote:
>
> Hi Ken,
>
>  
>
> It’s a bad story for us: even for a small window we have a dozens of
> thousands events per job with 10x in peaks or even more. And the
> number of jobs was known to be high. So instead of N operations (our
> producer/consumer mechanism) with shuffle/resorting (current flink
> realization) it will be N*ln(N) - the tenfold loss of execution speed!
>
> 4 all, my next step? Contribute to apache flink? Issues backlog?
>
>  
>
>  
>
> With best regards,
>
> Sergey
>
> *From:*Ken Krugler [mailto:kkrugler_li...@transpac.com]
> *Sent:* Wednesday, April 17, 2019 9:23 PM
> *To:* Smirnov Sergey Vladimirovich (39833) 
> *Subject:* Re: kafka partitions, data locality
>
>  
>
> Hi Sergey,
>
>  
>
> As you surmised, once you do a keyBy/max on the Kafka topic, to group
> by clientId and find the max, then the topology will have a
> partition/shuffle to it.
>
>  
>
> This is because Flink doesn’t know that client ids don’t span Kafka
> partitions.
>
>  
>
> I don’t know of any way to tell Flink that the data doesn’t need to be
> shuffled. There was a discussion
> 
>  about
> adding a keyByWithoutPartitioning a while back, but I don’t think that
> support was ever added.
>
>  
>
> A simple ProcessFunction with MapState (clientId -> max) should allow
> you do to the same thing without too much custom code. In order to
> support windowing, you’d use triggers to flush state/emit results.
>
>  
>
> — Ken
>
>  
>
>  
>
> On Apr 17, 2019, at 2:33 AM, Smirnov Sergey Vladimirovich (39833)
> mailto:s.smirn...@tinkoff.ru>> wrote:
>
>  
>
> Hello,
>
>  
>
> We planning to use apache flink as a core component of our new
> streaming system for internal processes (finance, banking
> business) based on apache kafka.
>
> So we starting some research with apache flink and one of the
> question, arises during that work, is how flink handle with data
> locality.
>
> I`ll try to explain: suppose we have a kafka topic with some kind
> of events. And this events groups by topic partitions so that the
> handler (or a job worker), consuming message from a partition,
> have all necessary information for further processing. 
>
> As an example, say we have client’s payment transaction in a kafka
> topic. We grouping by clientId (transaction with the same clientId
> goes to one same kafka topic partition) and the task is to find
> max transaction per client in sliding windows. In terms of
> map\reduce there is no needs to shuffle data between all topic
> consumers, may be it`s worth to do within each consumer to gain
> some speedup due to increasing number of executors within each
> partition data.
>
> And my question is how flink will work in this case. Do it shuffle
> all data, or it have some settings to avoid this extra unnecessary
> shuffle/sorting operations?
>
> Thanks in advance!
>
>  
>
>  
>
> With best regards,
>
> Sergey Smirnov
>
>  
>
> --
>
> Ken Krugler
> +1 530-210-6378
> http://www.scaleunlimited.com
> Custom big data solutions & training
> Flink, Solr, Hadoop, Cascading & Cassandra
>
>  
>


signature.asc
Description: OpenPGP digital signature


Re: Identify orphan records after joining two streams

2019-04-25 Thread Dawid Wysakowicz
Hi Averell,

I think your original solution is the right one, given your
requirements. I don't think it is over complicated.

As for the memory concerns, there is no bult-in mechanism for
backpressure/alignment based on event time. The community did take that
into consideration when discussing the new source interface though[1].
But as Hequn already mentioned if you use the RocksDBStateBackend, the
amount of space is limited by the disk space. Moreover you could add
some safety timer, that would fire every x minutes and clear the oldest
entries.

Best,

Dawid


[1]
https://lists.apache.org/thread.html/70484d6aa4b8e7121181ed8d5857a94bfb7d5a76334b9c8fcc59700c@%3Cdev.flink.apache.org%3E

On 19/04/2019 05:15, Averell wrote:
> Thank you Hecheng.
>
> I just tried to use Table API as your suggestion, and it almost worked (it
> worked with two issues here below):
> - I only get the output when my event-time watermark goes pass the end
> of the tumbling window. But, because I know that there are maximum 2 records
> per window (one from each stream), I would like to collect my output record
> as soon as I received two input records. With low-level-API, I believe I can
> do this with Trigger. Can I achieve a similar result with Table API?
> - In the UDAggF document, I saw a recommendation to use Java instead of
> Scala. Does this apply to the low-level-API functions as well? 
>
> Thanks and best regards,
> Averell
>
>
>
> --
> Sent from: 
> http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/



signature.asc
Description: OpenPGP digital signature


Re: Flink CLI

2019-04-25 Thread Marc Rooding
Hi Steven, Oytun

You may find the tool we open-sourced last year useful. It offers deploying and 
updating jobs with savepointing.

You can find it on Github: https://github.com/ing-bank/flink-deployer

There’s also a docker image available in Docker Hub.

Marc
On 24 Apr 2019, 17:29 +0200, Oytun Tez , wrote:
> Hi Steven,
>
> As much as I am aware,
> 1) no update call. our build flow feels a little weird to us as well. 
> definitely requires scripting.
> 2) we are using Flink management API remotely in our build flow to 1) get 
> jobs, 2) savepoint them, 3) cancel them etc. I am going to release a Python 
> script for this soon.
>
> ---
> Oytun Tez
>
> M O T A W O R D
> The World's Fastest Human Translation Platform.
> oy...@motaword.com — www.motaword.com
>
>
> > On Wed, Apr 24, 2019 at 11:06 AM Steven Nelson  
> > wrote:
> > > Hello!
> > >
> > > I am working on automating our deployments to our Flink cluster. I had a 
> > > couple questions about the flink cli.
> > >
> > > 1) I thought there was an "update" command that would internally manage 
> > > the cancel with savepoint, upload new jar, restart from savepoint process.
> > >
> > > 2) Is there a way to get the Flink cli to output it's result in a json 
> > > format? Right now I would need to parse the results of the "flink list" 
> > > command to get the job id, cancel the job with savepoint, parse the 
> > > results of that to get the savepoint filename, then restore using that. 
> > > Parsing the output seems brittle to me.
> > >
> > > Thought?
> > > -Steve
> > >


Re: TM occasionally hang in deploying state in Flink 1.5

2019-04-25 Thread qi luo
Hello,

This issue occurred again and we dumped the TM thread. It indeed hung on socket 
read to download jar from Blob server:

"DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) 
(1999/2000)" #72 prio=5 os_prio=0 tid=0x7fb9a1521000 nid=0xa0994 runnable 
[0x7fb97cfbf000]
   java.lang.Thread.State: RUNNABLE
at java.net.SocketInputStream.socketRead0(Native Method)
at java.net.SocketInputStream.socketRead(SocketInputStream.java:116)
at java.net.SocketInputStream.read(SocketInputStream.java:171)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at 
org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:152)
at 
org.apache.flink.runtime.blob.BlobInputStream.read(BlobInputStream.java:140)
at 
org.apache.flink.runtime.blob.BlobClient.downloadFromBlobServer(BlobClient.java:170)
at 
org.apache.flink.runtime.blob.AbstractBlobCache.getFileInternal(AbstractBlobCache.java:181)
at 
org.apache.flink.runtime.blob.PermanentBlobCache.getFile(PermanentBlobCache.java:206)
at 
org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager.registerTask(BlobLibraryCacheManager.java:120)
- locked <0x00078ab60ba8> (a java.lang.Object)
at 
org.apache.flink.runtime.taskmanager.Task.createUserCodeClassloader(Task.java:893)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:585)
at java.lang.Thread.run(Thread.java:748)

I checked the latest master code. There’s still no socket timeout in Blob 
client. Should I create an issue to add this timeout?

Regards,
Qi 

> On Apr 19, 2019, at 7:49 PM, qi luo  wrote:
> 
> Hi all,
> 
> We use Flink 1.5 batch and start thousands of jobs per day. Occasionally we 
> observed some stuck jobs, due to some TM hang in “DEPLOYING” state. 
> 
> On checking TM log, it shows that it stuck in downloading jars in BlobClient:
> 
> 
> ...
> INFO  org.apache.flink.runtime.taskexecutor.TaskExecutor   - Received 
> task DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) 
> (184/2000).
> INFO  org.apache.flink.runtime.taskmanager.Task - 
> DataSource (at createInput(ExecutionEnvironment.java:548) (our.code)) 
> (184/2000) switched from CREATED to DEPLOYING.
> INFO  org.apache.flink.runtime.taskmanager.Task - 
> Creating FileSystem stream leak safety net for task DataSource (at 
> createInput(ExecutionEnvironment.java:548) (our.code)) (184/2000) [DEPLOYING]
> INFO  org.apache.flink.runtime.taskmanager.Task - Loading 
> JAR files for task DataSource (at createInput(ExecutionEnvironment.java:548) 
> (our.code)) (184/2000) [DEPLOYING].
> INFO  org.apache.flink.runtime.blob.BlobClient  - 
> Downloading 
> 19e65c0caa41f264f9ffe4ca2a48a434/p-3ecd6341bf97d5512b14c93f6c9f51f682b6db26-37d5e69d156ee00a924c1ebff0c0d280
>  from some-host-ip-port
> 
> no more logs...
> 
> 
> It seems that the TM is calling BlobClient to download jars from 
> JM/BlobServer. Under hood it’s calling Socket.connect() and then 
> Socket.read() to retrieve results. 
> 
> Should we add timeout in socket operations in BlobClient to resolve this 
> issue?
> 
> Thanks,
> Qi



Re: State Migration with RocksDB MapState

2019-04-25 Thread Tzu-Li (Gordon) Tai
Hi Cliff,

Thanks for bringing this up again.

I think it would make sense to at least move this forward be only
exclusively checking the schema for user keys in MapState, and allow value
schema evolution.
I'll comment on the JIRA about this, and also make it a blocker for 1.9.0
to make sure it will be resolved by then.

Concerning your question on GenericRecord:
The actual schema resolution is still performed using the Avro schema, so
you will still bump into the same issue.

Best,
Gordon

On Wed, Apr 24, 2019 at 7:45 PM Cliff Resnick  wrote:

> Hi Gordon,
>
> I noticed there has been no movement on this issue and I'm wondering if I
> can find some way to work around this.
> My MapState value is a case class container of Avro-generated
> SpecificRecords. If one SpecificRecord changes I am stuck.
>
> From the issue It seems like the blocker is around evolving the MapState
> key type.  That may be a nasty problem, but my key type is stable and will
> never change. What do you think the level of difficulty would be to add
> support for evolving only the value?
>
> Also, if I use GenericRecord instead of SpecificRecord will the need for
> schema evolution still be triggered? Or does it actually go down to the
> avro schema rather than just the class serialVersionUID?
>
>
>
>
>
>
> On Mon, Mar 18, 2019 at 1:10 AM Tzu-Li (Gordon) Tai 
> wrote:
>
>> Hi Cliff,
>>
>> Thanks for bringing this up!
>> AFAIK, this wouldn't be a long-term blocker. I've just opened a JIRA to
>> track this [1].
>>
>> As explained in the JIRA ticket, the main reason this is disallowed in
>> the initial support for state schema evolution was due to how migration was
>> implemented in the RocksDB state backend.
>> Technically speaking, enabling this in the future is definitely possible.
>>
>> Cheers,
>> Gordon
>>
>> [1]  https://issues.apache.org/jira/browse/FLINK-11947
>>
>> On Mon, Mar 18, 2019 at 11:20 AM Cliff Resnick  wrote:
>>
>>> After trying out state migration in 1.8 rc2 I ran into this hard stop
>>> below. The comment does not give an indication why rocksdb map state cannot
>>> be migrated, and I'm wondering what the status is, since we do need this
>>> functionality and would like to know if this is a long-term blocker or not.
>>> Anyone know?
>>>
>>>
>>> https://github.com/apache/flink/blob/953a5ffcbdae4115f7d525f310723cf8770779df/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBKeyedStateBackend.java#L542
>>>
>>


Zeppelin

2019-04-25 Thread Smirnov Sergey Vladimirovich (39833)
Hello,

Trying to link Zeppelin 0.9 with Flink 1.8. It`s a small dev cluster deployed 
in standalone manner.
Got the same error as described here 
https://stackoverflow.com/questions/54257671/runnning-a-job-in-apache-flink-standalone-mode-on-zeppelin-i-have-this-error-to
Would appreciate for any support for helping to resolve that problem.

Regards,
Sergey



Query - External SSL setup

2019-04-25 Thread L Jainkeri, Suman (Nokia - IN/Bangalore)
Hi,

I am trying to authenticate Flink using NGINX. In the document it is mentioned 
to deploy a "side car proxy", here is the link for the section of the document 
which I have referred to 
https://ci.apache.org/projects/flink/flink-docs-release-1.7/ops/security-ssl.html#external--rest-connectivity
I see the Flink users have already deployed the proxies like Envoy 
Proxy or NGINX with MOD_AUTH.
Please can I get the code for how the NGINX proxy has been used for SSL setup.

Thank you & regards,
Suman