Re: Issue with Flink UI for Flink 1.14.0

2022-01-21 Thread Chesnay Schepler
While FLINK-24550 was indeed fixed unfortunately a similar bug was also 
introduced (https://issues.apache.org/jira/browse/FLINK-25732).


On 20/01/2022 21:18, Peter Westermann wrote:


Just tried this again with Flink 1.14.3 since 
https://issues.apache.org/jira/browse/FLINK-24550 is listed as fixed. 
I am running into similar errors when calling the /v1/jobs/overview 
endpoint (without any running jobs):


{"errors":["Internal server error.","side:\norg.apache.flink.runtime.rpc.akka.exceptions.AkkaRpcException: 
Failed to serialize the result for RPC call : 
requestMultipleJobDetails.\n\tat 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:417)\n\tat 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.lambda$sendAsyncResponse$2(AkkaRpcActor.java:373)\n\tat 
java.base/java.util.concurrent.CompletableFuture.uniHandle(CompletableFuture.java:930)\n\tat 
java.base/java.util.concurrent.CompletableFuture.uniHandleStage(CompletableFuture.java:946)\n\tat 
java.base/java.util.concurrent.CompletableFuture.handle(CompletableFuture.java:2266)\n\tat 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.sendAsyncResponse(AkkaRpcActor.java:365)\n\tat 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:332)\n\tat 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:217)\n\tat 
org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:78)\n\tat 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:163)\n\tat 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:24)\n\tat 
akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:20)\n\tat 
scala.PartialFunction.applyOrElse(PartialFunction.scala:123)\n\tat 
scala.PartialFunction.applyOrElse$(PartialFunction.scala:122)\n\tat 
akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:20)\n\tat 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:171)\n\tat 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat 
scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:172)\n\tat 
akka.actor.Actor.aroundReceive(Actor.scala:537)\n\tat 
akka.actor.Actor.aroundReceive$(Actor.scala:535)\n\tat 
akka.actor.AbstractActor.aroundReceive(AbstractActor.scala:220)\n\tat 
akka.actor.ActorCell.receiveMessage(ActorCell.scala:580)\n\tat 
akka.actor.ActorCell.invoke(ActorCell.scala:548)\n\tat 
akka.dispatch.Mailbox.processMailbox(Mailbox.scala:270)\n\tat 
akka.dispatch.Mailbox.run(Mailbox.scala:231)\n\tat 
akka.dispatch.Mailbox.exec(Mailbox.scala:243)\n\tat 
java.base/java.util.concurrent.ForkJoinTask.doExec(ForkJoinTask.java:290)\n\tat 
java.base/java.util.concurrent.ForkJoinPool$WorkQueue.topLevelExec(ForkJoinPool.java:1020)\n\tat 
java.base/java.util.concurrent.ForkJoinPool.scan(ForkJoinPool.java:1656)\n\tat 
java.base/java.util.concurrent.ForkJoinPool.runWorker(ForkJoinPool.java:1594)\n\tat 
java.base/java.util.concurrent.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:183)\nCaused 
by: java.io.NotSerializableException: java.util.HashMap$Values\n\tat 
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1185)\n\tat 
java.base/java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1553)\n\tat 
java.base/java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1510)\n\tat 
java.base/java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1433)\n\tat 
java.base/java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1179)\n\tat 
java.base/java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:349)\n\tat 
org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:632)\n\tat 
org.apache.flink.runtime.rpc.akka.AkkaRpcSerializedValue.valueOf(AkkaRpcSerializedValue.java:66)\n\tat 
org.apache.flink.runtime.rpc.akka.AkkaRpcActor.serializeRemoteResultAndVerifySize(AkkaRpcActor.java:400)\n\t... 
30 more\n\nEnd of exception on server side>"]}


Peter Westermann

Team Lead – Realtime Analytics

cidimage001.jpg@01D78D4C.C00AC080

peter.westerm...@genesys.com 

cidimage001.jpg@01D78D4C.C00AC080

cidimage002.jpg@01D78D4C.C00AC080 

*From: *Dawid Wysakowicz 
*Date: *Thursday, October 14, 2021 at 10:00 AM
*To: *Peter Westermann , 
user@flink.apache.org 

*Subject: *Re: Issue with Flink UI for Flink 1.14.0

I am afraid it is a bug in flink 1.14. I created a ticket for it 
FLINK-24550[1]. I believe we should pick it up soonish. Thanks for 
reporting the issue!


Best,

Dawid

[1] https://issues.apache.org/jira/browse/FLINK-24550

On 13/10/2021 20:32, Peter Westermann wrote:

Hello,

I just started testing Flink 1.14.0 and noticed some weird
behavior. This is for a Flink cluster with zookeeper for HA and
two job managers (one leader, one backup). The UI on the leader
works fine. The UI on the other job manager does not lo

Re: Unhandled exception in flink 1.14.2

2022-01-21 Thread Chesnay Schepler
What you are seeing in 1.14.3 is a separate bug, that behaves very 
similarly. https://issues.apache.org/jira/browse/FLINK-25732


On 20/01/2022 22:11, John Smith wrote:

As per another recent thread. This is still an issue.

On Wed, 19 Jan 2022 at 06:36, Chesnay Schepler  wrote:

This is a serialization bug in Flink, see
https://issues.apache.org/jira/browse/FLINK-24550.
It will be fixed in the upcoming 1.14.3 release.

On 19/01/2022 09:01, Caizhi Weng wrote:

Hi!

To print out gc logs of job manager you can add this
configuration to flink-conf.yaml

env.java.opts.jobmanager: -Xloggc:/tmp/jobmanager-gc.log
-XX:+PrintGCDetails -XX:+PrintGCDateStamps

This will print gc logs to /tmp/jobmanager-gc.log.

I'm not familiar with the garbage collection metrics page. If the
unit of time is ms then gc does not seem to be heavy. However I
would still recommend to print out gc logs for a double check.

John Smith  于2022年1月19日周三 06:43写道:

I think I may know what is causing the issue... So I have 3
job managers.

1- I Navigated to a non leader UI and submitted a new job...
2- The UI timed out with grey lines
3- Some Internal Server error messages appeared.
4- Going back to the leader UI checking the running jobs, the
job seems to have been submitted and running.
5- Going back to the job manager UI that failed, now shows ok.

And the logs are as follows... And below are the GC metrics
from the UI.

2022-01-18 22:33:24,574 INFO

org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
[] - Job 2297fdac52fa7191afee9ec4ff11c805 is submitted.
2022-01-18 22:33:24,574 INFO

org.apache.flink.client.deployment.application.executors.EmbeddedExecutor
[] - Submitting Job with JobId=2297fdac52fa7191afee9ec4ff11c805.
2022-01-18 22:34:00,618 ERROR
org.apache.flink.runtime.rest.handler.job.JobDetailsHandler
[] - Unhandled exception.
java.util.concurrent.CancellationException: null
at

java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:2276)
~[?:1.8.0_312]
at

org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInternal(DefaultExecutionGraphCache.java:98)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.legacy.DefaultExecutionGraphCache.getExecutionGraphInfo(DefaultExecutionGraphCache.java:67)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.job.AbstractExecutionGraphHandler.handleRequest(AbstractExecutionGraphHandler.java:81)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.AbstractRestHandler.respondToRequest(AbstractRestHandler.java:83)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.AbstractHandler.respondAsLeader(AbstractHandler.java:195)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.lambda$channelRead0$0(LeaderRetrievalHandler.java:83)
~[flink-dist_2.12-1.14.2.jar:1.14.2]
at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_312]
at

org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer.java:45)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:80)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler.channelRead0(LeaderRetrievalHandler.java:49)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.shaded.netty4.io.netty.channel.SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler.java:99)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:379)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:365)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.shaded.netty4.io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:357)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(RouterHandler.java:115)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at

org.apache.flink.runtime.rest.handler.router.RouterHandler.channelRead0(RouterHandler.java:94)
[flink-dist_2.12-1.14.2.jar:1.14.2]
at
  

Re: [DISCUSS] Future of Per-Job Mode

2022-01-21 Thread Konstantin Knauf
Thanks Thomas & Biao for your feedback.

Any additional opinions on how we should proceed with per job-mode? As you
might have guessed, I am leaning towards proposing to deprecate per-job
mode.

On Thu, Jan 13, 2022 at 5:11 PM Thomas Weise  wrote:

> Regarding session mode:
>
> ## Session Mode
> * main() method executed in client
>
> Session mode also supports execution of the main method on Jobmanager
> with submission through REST API. That's how Flinkk k8s operators like
> [1] work. It's actually an important capability because it allows for
> allocation of the cluster resources prior to taking down the previous
> job during upgrade when the goal is optimization for availability.
>
> Thanks,
> Thomas
>
> [1] https://github.com/lyft/flinkk8soperator
>
> On Thu, Jan 13, 2022 at 12:32 AM Konstantin Knauf 
> wrote:
> >
> > Hi everyone,
> >
> > I would like to discuss and understand if the benefits of having Per-Job
> > Mode in Apache Flink outweigh its drawbacks.
> >
> >
> > *# Background: Flink's Deployment Modes*
> > Flink currently has three deployment modes. They differ in the following
> > dimensions:
> > * main() method executed on Jobmanager or Client
> > * dependencies shipped by client or bundled with all nodes
> > * number of jobs per cluster & relationship between job and cluster
> > lifecycle* (supported resource providers)
> >
> > ## Application Mode
> > * main() method executed on Jobmanager
> > * dependencies already need to be available on all nodes
> > * dedicated cluster for all jobs executed from the same main()-method
> > (Note: applications with more than one job, currently still significant
> > limitations like missing high-availability). Technically, a session
> cluster
> > dedicated to all jobs submitted from the same main() method.
> > * supported by standalone, native kubernetes, YARN
> >
> > ## Session Mode
> > * main() method executed in client
> > * dependencies are distributed from and by the client to all nodes
> > * cluster is shared by multiple jobs submitted from different clients,
> > independent lifecycle
> > * supported by standalone, Native Kubernetes, YARN
> >
> > ## Per-Job Mode
> > * main() method executed in client
> > * dependencies are distributed from and by the client to all nodes
> > * dedicated cluster for a single job
> > * supported by YARN only
> >
> >
> > *# Reasons to Keep** There are use cases where you might need the
> > combination of a single job per cluster, but main() method execution in
> the
> > client. This combination is only supported by per-job mode.
> > * It currently exists. Existing users will need to migrate to either
> > session or application mode.
> >
> >
> > *# Reasons to Drop** With Per-Job Mode and Application Mode we have two
> > modes that for most users probably do the same thing. Specifically, for
> > those users that don't care where the main() method is executed and want
> to
> > submit a single job per cluster. Having two ways to do the same thing is
> > confusing.
> > * Per-Job Mode is only supported by YARN anyway. If we keep it, we should
> > work towards support in Kubernetes and Standalone, too, to reduce special
> > casing.
> > * Dropping per-job mode would reduce complexity in the code and allow us
> to
> > dedicate more resources to the other two deployment modes.
> > * I believe with session mode and application mode we have to easily
> > distinguishable and understandable deployment modes that cover Flink's
> use
> > cases:
> >* session mode: olap-style, interactive jobs/queries, short lived
> batch
> > jobs, very small jobs, traditional cluster-centric deployment mode (fits
> > the "Hadoop world")
> >* application mode: long-running streaming jobs, large scale &
> > heterogenous jobs (resource isolation!), application-centric deployment
> > mode (fits the "Kubernetes world")
> >
> >
> > *# Call to Action*
> > * Do you use per-job mode? If so, why & would you be able to migrate to
> one
> > of the other methods?
> > * Am I missing any pros/cons?
> > * Are you in favor of dropping per-job mode midterm?
> >
> > Cheers and thank you,
> >
> > Konstantin
> >
> > --
> >
> > Konstantin Knauf
> >
> > https://twitter.com/snntrable
> >
> > https://github.com/knaufk
>


-- 

Konstantin Knauf

https://twitter.com/snntrable

https://github.com/knaufk


[DISCUSS] Deprecate/remove Twitter connector

2022-01-21 Thread Martijn Visser
Hi everyone,

I would like to discuss deprecating Flinks' Twitter connector [1]. This was
one of the first connectors that was added to Flink, which could be used to
access the tweets from Twitter. Given the evolution of Flink over Twitter,
I don't think that:

* Users are still using this connector at all
* That the code for this connector should be in the main Flink codebase.

Given the circumstances, I would propose to deprecate and remove this
connector. I'm looking forward to your thoughts. If you agree, please also
let me know if you think we should first deprecate it in Flink 1.15 and
remove it in a version after that, or if you think we can remove it
directly.

Best regards,

Martijn Visser
https://twitter.com/MartijnVisser82

[1]
https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/twitter/


Flink 1.14 metrics : taskmanager host name missing

2022-01-21 Thread Mayur Gubrele
Hello,

We recently upgraded our Flink cluster to 1.14 and noticed that all the
taskmanager metrics we receive in our Prometheus data source get host IPs
instead of hostnames, which was the case earlier before we moved to 1.14.

I see on the flink dashboard as well under taskmanager details, host IPs
are being populated now. This is a breaking change for us. Some of our APIs
use this host tag value which used to be the hostname earlier.

Can you tell us if there's a way we can configure to get hostnames instead
of IPs?

Thanks,
Mayur


Window function - flush on job stop

2022-01-21 Thread Lars Skjærven
We're doing a stream.keyBy().window().aggregate() to aggregate customer
feedback into sessions. Every now and then we have to update the job, e.g.
change the key, so that we can't easlily continue from the previous state.

Cancelling the job (without restarting from last savepoint) will result in
loosing ongoing sessions. So we typically go back a few hours when we
restart to minimize the loss.

Is there any way of making the job flush it's content (sessions) on job
cancellation? That will result in splitting ongoing sessions in two, which
is perfectly fine for our purpose.

Any thoughts ?

Lars


Re: Unhandled exception in flink 1.14.2

2022-01-21 Thread John Smith
Ok I see. So I guess in 1.4.3, one was fixed but broke something else?

Because in 1.4.2 it feels like it worked after a refresh but with 1.4.3 it
always fails looks like.


On Fri., Jan. 21, 2022, 3:29 a.m. Chesnay Schepler, 
wrote:

> What you are seeing in 1.14.3 is a separate bug, that behaves very
> similarly. https://issues.apache.org/jira/browse/FLINK-25732
>
> On 20/01/2022 22:11, John Smith wrote:
>
> As per another recent thread. This is still an issue.
>
> On Wed, 19 Jan 2022 at 06:36, Chesnay Schepler  wrote:
>
>> This is a serialization bug in Flink, see
>> https://issues.apache.org/jira/browse/FLINK-24550.
>> It will be fixed in the upcoming 1.14.3 release.
>>
>> On 19/01/2022 09:01, Caizhi Weng wrote:
>>
>> Hi!
>>
>> To print out gc logs of job manager you can add this configuration to
>> flink-conf.yaml
>>
>> env.java.opts.jobmanager: -Xloggc:/tmp/jobmanager-gc.log
>> -XX:+PrintGCDetails -XX:+PrintGCDateStamps
>>
>> This will print gc logs to /tmp/jobmanager-gc.log.
>>
>> I'm not familiar with the garbage collection metrics page. If the unit of
>> time is ms then gc does not seem to be heavy. However I would still
>> recommend to print out gc logs for a double check.
>>
>> John Smith  于2022年1月19日周三 06:43写道:
>>
>>> I think I may know what is causing the issue... So I have 3 job managers.
>>>
>>> 1- I Navigated to a non leader UI and submitted a new job...
>>> 2- The UI timed out with grey lines
>>> 3- Some Internal Server error messages appeared.
>>> 4- Going back to the leader UI checking the running jobs, the job seems
>>> to have been submitted and running.
>>> 5- Going back to the job manager UI that failed, now shows ok.
>>>
>>> And the logs are as follows... And below are the GC metrics from the UI.
>>>
>>> 2022-01-18 22:33:24,574 INFO
>>> org.apache.flink.client.deployment.application.executors.
>>> EmbeddedExecutor [] - Job 2297fdac52fa7191afee9ec4ff11c805 is submitted.
>>> 2022-01-18 22:33:24,574 INFO
>>> org.apache.flink.client.deployment.application.executors.
>>> EmbeddedExecutor [] - Submitting Job with JobId=2297f
>>> dac52fa7191afee9ec4ff11c805.
>>> 2022-01-18 22:34:00,618 ERROR org.apache.flink.runtime.rest.handler.job.
>>> JobDetailsHandler [] - Unhandled exception.
>>> java.util.concurrent.CancellationException: null
>>> at java.util.concurrent.CompletableFuture.cancel(CompletableFuture.java:
>>> 2276) ~[?:1.8.0_312]
>>> at org.apache.flink.runtime.rest.handler.legacy.
>>> DefaultExecutionGraphCache.getExecutionGraphInternal(
>>> DefaultExecutionGraphCache.java:98) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at org.apache.flink.runtime.rest.handler.legacy.
>>> DefaultExecutionGraphCache.getExecutionGraphInfo(
>>> DefaultExecutionGraphCache.java:67) ~[flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at org.apache.flink.runtime.rest.handler.job.
>>> AbstractExecutionGraphHandler.handleRequest(
>>> AbstractExecutionGraphHandler.java:81) ~[flink-dist_2.12-1.14.2.jar:1.14
>>> .2]
>>> at org.apache.flink.runtime.rest.handler.AbstractRestHandler
>>> .respondToRequest(AbstractRestHandler.java:83) ~[flink-dist_2.12-1.14.2
>>> .jar:1.14.2]
>>> at org.apache.flink.runtime.rest.handler.AbstractHandler
>>> .respondAsLeader(AbstractHandler.java:195) ~[flink-dist_2.12-1.14.2.jar:
>>> 1.14.2]
>>> at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler
>>> .lambda$channelRead0$0(LeaderRetrievalHandler.java:83) ~[flink-dist_2.12
>>> -1.14.2.jar:1.14.2]
>>> at java.util.Optional.ifPresent(Optional.java:159) [?:1.8.0_312]
>>> at org.apache.flink.util.OptionalConsumer.ifPresent(OptionalConsumer
>>> .java:45) [flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler
>>> .channelRead0(LeaderRetrievalHandler.java:80) [flink-dist_2.12-1.14.2
>>> .jar:1.14.2]
>>> at org.apache.flink.runtime.rest.handler.LeaderRetrievalHandler
>>> .channelRead0(LeaderRetrievalHandler.java:49) [flink-dist_2.12-1.14.2
>>> .jar:1.14.2]
>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>> SimpleChannelInboundHandler.channelRead(SimpleChannelInboundHandler
>>> .java:99) [flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>> AbstractChannelHandlerContext.invokeChannelRead(
>>> AbstractChannelHandlerContext.java:379) [flink-dist_2.12-1.14.2.jar:1.14
>>> .2]
>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>> AbstractChannelHandlerContext.invokeChannelRead(
>>> AbstractChannelHandlerContext.java:365) [flink-dist_2.12-1.14.2.jar:1.14
>>> .2]
>>> at org.apache.flink.shaded.netty4.io.netty.channel.
>>> AbstractChannelHandlerContext.fireChannelRead(
>>> AbstractChannelHandlerContext.java:357) [flink-dist_2.12-1.14.2.jar:1.14
>>> .2]
>>> at org.apache.flink.runtime.rest.handler.router.RouterHandler.routed(
>>> RouterHandler.java:115) [flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at org.apache.flink.runtime.rest.handler.router.RouterHandler
>>> .channelRead0(RouterHandler.java:94) [flink-dist_2.12-1.14.2.jar:1.14.2]
>>> at org.apache.flink.r

Re: Examples / Documentation for Flink ML 2

2022-01-21 Thread Bonino Dario

Hi Dong,

We assembled a first, very small, Markdown document providing a 
jump-start description using a kMeans example. I could already share it 
with you to check if we are pointing in the right direction. I had a 
look at the Flink contribution guidelines, however the flink-ml project 
is  somewhat "separate" from Flink and the same I think holds for the 
documentation. How do you think it is better to proceed?


Best regards

Dario Bonino

On 1/19/22 09:36, Dong Lin wrote:

Hi Bonino,

Definitely, it will be great to build up the Flink ML docs together 
based on your experience.


Thanks!
Dong

On Wed, Jan 19, 2022 at 4:32 PM Bonino Dario  
wrote:


Hi Dong,

Thank you for the reply. Since we are actually experimenting with
the Flink ML libraries, If you think it's worth, we may contribute
some documentation, e.g., tutorial based on what we learn while
setting up our test project with Flink ML. Is it something that
might be of interest for you?

Best regards

Dario

On 1/18/22 04:51, Dong Lin wrote:

Hi Bonino,

Thanks for your interest!

Flink ML is currently ready for experienced algorithm developers
to try it out because we have setup the basic APIs and
infrastructure to develop algorithms. Five algorithms (i.e.
kmeans, naive bays, knn, logistic regression and one-hot encoder)
has been implemented in the last release. Their unit tests can be
found here

,
here


and here

,
which show how to use these algorithms (including
transform/fit/save/load). And from these unit tests you can find
implementation of these algorithms which can be used as reference
implementation to develop other algorithms of your interest.

We plan to setup a website for Flink ML to provide links to
example/tutorial similar to the Flink Statefun website (link
).
This website will likely be setup in March. We are currently
working on developing further infrastructure for benchmarking and
optimizing the machine learning algorithms in Flink ML.

Best Regards,
Dong



On Mon, Jan 17, 2022 at 8:57 PM Dawid Wysakowicz
 wrote:

I am adding a couple of people who worked on it. Hopefully,
they will be able to answer you.

On 17/01/2022 13:39, Bonino Dario wrote:


Dear List,

We are in the process of evaluating Flink ML version 2.0 in
the context of some ML task mainly concerned with
classification and clustering.

While algorithms for this 2 domains are already present,
although in a limited form (perhaps) in the latest release
of Flink ML, we did not found any example / documentation
that could guide our experiments.

Is some adoption example available, like code, tutorial or
any information that might help us in bootstrapping a Flink
ML 2 project?

Thank you very much

Best regards

-- 
Ing. Dario Bonino, Ph.D


e-m@il:dario.bon...@gmail.com  
www:https://www.linkedin.com/in/dariobonino


Dario
Bonino
slide...@hotmail.com



-- 
Ing. Dario Bonino, Ph.D


e-m@il:dario.bon...@gmail.com  
www:https://www.linkedin.com/in/dariobonino


Dario
Bonino
slide...@hotmail.com



--
Ing. Dario Bonino, Ph.D

e-m@il:dario.bon...@gmail.com  
www:https://www.linkedin.com/in/dariobonino


Dario
Bonino
slide...@hotmail.com



Re: Flink 1.14.2 - Log4j2 -Dlog4j.configurationFile is ignored and falls back to default /opt/flink/conf/log4j-console.properties

2022-01-21 Thread Tamir Sagi
Yes,

Thank you!
I will handle that.

Best,
Tamir


From: Yang Wang 
Sent: Friday, January 21, 2022 5:11 AM
To: Tamir Sagi 
Cc: user@flink.apache.org 
Subject: Re: Flink 1.14.2 - Log4j2 -Dlog4j.configurationFile is ignored and 
falls back to default /opt/flink/conf/log4j-console.properties


EXTERNAL EMAIL


Changing the order of exec command makes sense to me. Would you please create a 
ticket for this?

The /opt/flink/conf is cleaned up because we are mounting the conf files from 
K8s ConfigMap.



Best,
Yang

Tamir Sagi mailto:tamir.s...@niceactimize.com>> 
于2022年1月18日周二 17:48写道:
Hey Yang,

Thank you for confirming it.

IMO, a better approach is to change the order "log_setting" , "ARGS" and 
"FLINK_ENV_JAVA_OPTS" in exec command.
In that way we prioritize user defined properties.

From:

exec "$JAVA_RUN" $JVM_ARGS ${FLINK_ENV_JAVA_OPTS} "${log_setting[@]}" 
-classpath "`manglePathList "$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" 
${CLASS_TO_RUN} "${ARGS[@]}"

To

exec "$JAVA_RUN" $JVM_ARGS "${log_setting[@]}" -classpath "`manglePathList 
"$FLINK_TM_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"`" ${CLASS_TO_RUN} 
"${ARGS[@]}" "${FLINK_ENV_JAVA_OPTS}"

Unless there are system configurations which not supposed to be overridden by 
user(And then having dedicated env variables is better). does it make sense to 
you?


In addition, any idea why /opt/flink/conf gets cleaned (Only flink-conf.xml is 
there).


Best,
Tamir




From: Yang Wang mailto:danrtsey...@gmail.com>>
Sent: Tuesday, January 18, 2022 6:02 AM
To: Tamir Sagi mailto:tamir.s...@niceactimize.com>>
Cc: user@flink.apache.org 
mailto:user@flink.apache.org>>
Subject: Re: Flink 1.14.2 - Log4j2 -Dlog4j.configurationFile is ignored and 
falls back to default /opt/flink/conf/log4j-console.properties


EXTERNAL EMAIL


I think you are right. Before 1.13.0, if the log configuration file does not 
exist, the logging properties would not be added to the start command. That is 
why it could work in 1.12.2.

However, from 1.13.0, we are not using 
"kubernetes.container-start-command-template" to generate the JM/TM start 
command, but the 
jobmanager.sh/taskmanager.sh. We do not
have the same logic in the "flink-console.sh".

Maybe we could introduce an environment for log configuration file name in the 
"flink-console.sh". The default value could be "log4j-console.properties" and 
it could be configured by users.
If this makes sense to you, could you please create a ticket?


Best,
Yang

Tamir Sagi mailto:tamir.s...@niceactimize.com>> 
于2022年1月17日周一 22:53写道:
Hey Yang,

thanks for answering,

TL;DR

Assuming I have not missed anything , the way TM and JM are created is 
different between these 2 versions,
but it does look like flink-console.sh gets called eventually with the same 
exec command.

in 1.12.2 if org.apache.flink.kubernetes.kubeclient.parameters#hasLog4j returns 
false then logging args are not added to startCommand.


  1.  why does the config dir gets cleaned once the cluster starts? Even when I 
pushed log4j-console.properties to the expected location (/opt/flink/conf) , 
the directory includes only flink-conf.yaml.
  2.  I think by running exec command "...${FLINK_ENV_JAVA_OPTS} 
"${log_setting[@]}" "${ARGS[@]}" some properties might be ignored.
IMO, it should first look for properties in java.opts provided by the user in 
flink-conf and falls back to default in case it's not present.

Taking about Native kubernetes mode

I checked the bash script in flink-dist module, it looks like in both 1.14.2 
and 1.12.2. flink-console.sh is similar. (in 1.14.2 there are more cases for 
the input argument)

logging variable is the same
https://github.com/apache/flink/blob/release-1.14.2/flink-dist/src/main/flink-bin/bin/flink-console.sh#L101
https://github.com/apache/flink/blob/release-1.12.2/flink-dist/src/main/flink-bin/bin/flink-console.sh#L89

Exec command is the same
https://github.com/apache/flink/blob/release-1.14.2/flink-dist/src/main/flink-bin/bin/flink-console.sh#L114
https://github.com/apache/flink/blob/release-1.12.2/flink-dist/src/main/flink-bin/bin/flink-console.sh#L99

As for creating TM/JM, in 1.14.2 there is a usage of 2 bash scripts

  *   kubernetes-jobmanager.sh
  *   kubernetes-taskmanager.sh

They get called while decorating the pod, referenced in startCommand.

for instance, JobManager.
https://github.com/apache/flink/blob/release-1.14.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/decorators/CmdJobManagerDecorator.java#L58-L59

kubernetes-jobmanager.sh gets called once the container starts which calls 
flink-console.sh internally and pass the deploymentName(kubernetes-application 
in our case) and args.

In 1.12.2 the decorator set /docker-entrypoint.sh
https://github.com/apache/flink/blob/release-1.12.2/flink-kubernetes/src/main/java/org/apache/flink/kubernetes/kubeclient/fac

Re: Examples / Documentation for Flink ML 2

2022-01-21 Thread Dong Lin
Hey Bonino,

Sounds great. Since we have not set up the website for Flink ML yet, how
about we create PRs for https://github.com/apache/flink-ml and put those
Markdown files under flink-ml/docs?

Best Regards,
Dong

On Sat, Jan 22, 2022 at 12:25 AM Bonino Dario 
wrote:

> Hi Dong,
>
> We assembled a first, very small, Markdown document providing a jump-start
> description using a kMeans example. I could already share it with you to
> check if we are pointing in the right direction. I had a look at the Flink
> contribution guidelines, however the flink-ml project is  somewhat
> "separate" from Flink and the same I think holds for the documentation. How
> do you think it is better to proceed?
>
> Best regards
>
> Dario Bonino
> On 1/19/22 09:36, Dong Lin wrote:
>
> Hi Bonino,
>
> Definitely, it will be great to build up the Flink ML docs together based
> on your experience.
>
> Thanks!
> Dong
>
> On Wed, Jan 19, 2022 at 4:32 PM Bonino Dario 
> wrote:
>
>> Hi Dong,
>>
>> Thank you for the reply. Since we are actually experimenting with the
>> Flink ML libraries, If you think it's worth, we may contribute some
>> documentation, e.g., tutorial based on what we learn while setting up our
>> test project with Flink ML. Is it something that might be of interest for
>> you?
>>
>> Best regards
>>
>> Dario
>> On 1/18/22 04:51, Dong Lin wrote:
>>
>> Hi Bonino,
>>
>> Thanks for your interest!
>>
>> Flink ML is currently ready for experienced algorithm developers to try
>> it out because we have setup the basic APIs and infrastructure to develop
>> algorithms. Five algorithms (i.e. kmeans, naive bays, knn, logistic
>> regression and one-hot encoder) has been implemented in the last release.
>> Their unit tests can be found here
>> ,
>> here
>> 
>> and here
>> ,
>> which show how to use these algorithms (including transform/fit/save/load).
>> And from these unit tests you can find implementation of these algorithms
>> which can be used as reference implementation to develop other algorithms
>> of your interest.
>>
>> We plan to setup a website for Flink ML to provide links to
>> example/tutorial similar to the Flink Statefun website (link
>> ). This
>> website will likely be setup in March. We are currently working on
>> developing further infrastructure for benchmarking and optimizing the
>> machine learning algorithms in Flink ML.
>>
>> Best Regards,
>> Dong
>>
>>
>>
>> On Mon, Jan 17, 2022 at 8:57 PM Dawid Wysakowicz 
>> wrote:
>>
>>> I am adding a couple of people who worked on it. Hopefully, they will be
>>> able to answer you.
>>> On 17/01/2022 13:39, Bonino Dario wrote:
>>>
>>> Dear List,
>>>
>>> We are in the process of evaluating Flink ML version 2.0 in the context
>>> of some ML task mainly concerned with classification and clustering.
>>>
>>> While algorithms for this 2 domains are already present, although in a
>>> limited form (perhaps) in the latest release of Flink ML, we did not found
>>> any example / documentation that could guide our experiments.
>>>
>>> Is some adoption example available, like code, tutorial or any
>>> information that might help us in bootstrapping a Flink ML 2 project?
>>>
>>> Thank you very much
>>>
>>> Best regards
>>>
>>> --
>>> Ing. Dario Bonino, Ph.D
>>>
>>> e-m@il: dario.bon...@gmail.com
>>> www: https://www.linkedin.com/in/dariobonino
>>> 
>>> Dario
>>> Bonino
>>> slide...@hotmail.com
>>> 
>>>
>>> --
>> Ing. Dario Bonino, Ph.D
>>
>> e-m@il: dario.bon...@gmail.com
>> www: https://www.linkedin.com/in/dariobonino
>> 
>>  Dario
>>  Bonino
>>  slide...@hotmail.com
>> 
>>
>> --
> Ing. Dario Bonino, Ph.D
>
> e-m@il: dario.bon...@gmail.com
> www: https://www.linkedin.com/in/dariobonino
> 
>   Dario
>   Bonino
>   slide...@hotmail.com
> 
>
>


Question about MapState size

2022-01-21 Thread Abdul Rahman
Hello,

I have a streaming application that has an operator based on the
KeyedCoProcessFunction. The operator has a MapState object.  I store
some data in this operator with a fixed ttl. I would like to monitor
the size/count of this state over time since its related to some
operational metrics we want to track. Seems like a simple thing to do;
but I havent come up with a way to do so

Given that iterating over the complete map is an expensive operation,
I only plan to do so periodically.  The first issue is that , the
stream is keyed, so any time i do a count of the mapstate, i dont get
the complete size of the state object, but only count pertaining to
the specific key of partition. Is there a way around this ?

Secondly, is there a way to monitor rocksdb usage over time. I can
find managed memory metrics. but this does not include disk space
rocksdb used. is there a way to get this from standard flink metrics;
either task manager or job manager ?