Applying Custom metrics

2020-10-05 Thread Piper Piper
Hi

I have questions regarding making my own custom metrics.

When exactly is the class RichMapFunction’s map(value) method
called/invoked, and what “value” will be passed/expected as an argument to
this map(value) method?

Does the RichMapFunction’s map() method have any relation to the
transformation map() method, or are they completely different?

Once defined, how do I put the custom metric onto a specific source,
specific operator, and onto a specific sink in my job’s DAG?

Thank you,

Piper


Re: Maximum query and refresh rate for metrics from REST API

2020-09-18 Thread Piper Piper
Thank you, Chesnay!

On Thu, Sep 17, 2020, 3:59 AM Chesnay Schepler  wrote:

> By default metrics are only updated every 10 seconds; this can be
> controlled via
>
> https://ci.apache.org/projects/flink/flink-docs-release-1.11/ops/config.html#metrics-fetcher-update-interval
> .
>
> On 9/17/2020 12:22 AM, Piper Piper wrote:
> > Hello,
> >
> > What is the recommended way to get metrics (such as CPU, Memory and
> > user defined meters and gauges) at the highest frequency rate (i.e.
> > with the highest/fastest refresh rate) such as every 500 milliseconds
> > or less?
> >
> > Is there any rate limiting by default on querying the REST API for
> > metrics? I am querying the REST API every second but not seeing any
> > change in the CPU load for every second, so I was wondering if there
> > is any maximum frequency at which I can query it.
> >
> > Thanks,
> >
> > Piper
>
>
>


Maximum query and refresh rate for metrics from REST API

2020-09-16 Thread Piper Piper
Hello,

What is the recommended way to get metrics (such as CPU, Memory and user
defined meters and gauges) at the highest frequency rate (i.e. with the
highest/fastest refresh rate) such as every 500 milliseconds or less?

Is there any rate limiting by default on querying the REST API for metrics?
I am querying the REST API every second but not seeing any change in the
CPU load for every second, so I was wondering if there is any maximum
frequency at which I can query it.

Thanks,

Piper


Measure CPU utilization

2020-09-10 Thread Piper Piper
Hello,

What is the best way to measure the CPU utilization of a TaskManager in
Flink, as opposed to using Linux's "top" command? Is querying the REST
endpoint 
http://:/taskmanagers//metrics?get=Status.JVM.CPU.Load\
the best option? Roman's reply (copied below) from the archives suggests
that it returns the CPU usage for the whole system including
other processes currently in the system, and would not give the CPU
utilization only of that Task Manager.

Based on Roman's reply that JVM.CPU.Time is a more clear indicator of CPU
usage, can you suggest how I would use it to calculate CPU utilization? Is
there any way I can get the CPU utilization for a Job that is distributed
over several nodes in the cluster?

Also, what is the difference between the two REST API endpoints below:

1. http://:/taskmanagers//metrics?get=Status.JVM.CPU.Load\
2. http://:/taskmanagers//metrics?get=System.CPU.Usage\

Thanks,

Piper

Hi,

JVM.CPU.Load is just a wrapper (MetricUtils.instantiateCPUMetrics) on
top of OperatingSystemMXBean.getProcessCpuLoad (see
https://docs.oracle.com/javase/7/docs/jre/api/management/extension/com/sun/management/OperatingSystemMXBean.html#getProcessCpuLoad>())

Usually it looks weird if you have multiple CPU cores. For example, if
you have a job with a single slot 100% utilizing a single CPU core on
a 8 core machine, the JVM.CPU.Load will be 1.0/8.0 = 0.125. It's also
a point-in-time snapshot of current CPU usage, so if you're collecting
your metrics every minute, and the job has spiky workload within this
minute (like it's idle almost always and once in a minute it consumes
100% CPU for one second), so you have a chance to completely miss this
from the metrics.

As for me personally, JVM.CPU.Time is more clear indicator of CPU
usage, which is always increasing amount of milliseconds CPU spent
executing your code. And it will also catch CPU usage spikes.

Roman Grebennikov | g...@dfdx.me


Re: How to use Flink IDE

2020-08-30 Thread Piper Piper
Thank you, Narasimha and Arvid!

On Sun, Aug 30, 2020 at 3:09 PM Arvid Heise  wrote:

> Hi Piper,
>
> to step into Flink source code, you don't need to import Flink sources
> manually or build Flink at all. It's enough to tell IntelliJ to also
> download sources for Maven dependencies. [1]
>
> Flink automatically uploads the source code for each build. For example,
> see the 1.11.1 artifacts of flink-runtime. [2]
>
> [1]
> https://intellij-support.jetbrains.com/hc/en-us/community/posts/206834305-Automatically-download-sources-documentation-from-maven-working-great
> [2]
> https://repo1.maven.org/maven2/org/apache/flink/flink-runtime_2.11/1.11.1/
>
> On Sun, Aug 30, 2020 at 8:19 PM Ardhani Narasimha Swamy <
> ardhani.narasi...@razorpay.com> wrote:
>
>> Hi Piper,
>>
>> Welcome to Flink Community.
>>
>> Import flink project like any other project into IDE, only difference
>> while running is you have click on  "Include dependencies with
>> "Provided" scope" in the main class run configurations. This bundles the
>> Flink dependencies in the artifact, making it a fat jar and deploy it.
>>
>>
>> Steps:
>>
>> 1. Open main class run/debug configurations
>> 2. Click on Include dependencies with Provided scope.
>> 3. Apply
>>
>>
>> Thanks,
>> Narasimha
>>
>>
>>
>> On Sun, Aug 30, 2020 at 11:40 PM Piper Piper 
>> wrote:
>>
>>> Hi,
>>>
>>> Till now, I have only been using Flink binaries. How do I setup Flink in
>>> my IntelliJ IDE so that while running/debugging my Flink application
>>> program I can also step into the Flink source code?
>>>
>>> Do I first need to import Flink's source repository into my IDE and
>>> build it?
>>>
>>> Thanks,
>>>
>>> Piper
>>>
>>
>>
>> ---
>> *IMPORTANT*: The contents of this email and any attachments are
>> confidential and protected by applicable laws. If you have received this
>> email by mistake, please (i) notify the sender immediately; (ii) delete it
>> from your database; and (iii) do not disclose the contents to anyone or
>> make copies thereof. Razorpay accepts no liability caused due to any
>> inadvertent/ unintentional data transmitted through this email.
>>
>> ---
>>
>
>
> --
>
> Arvid Heise | Senior Java Developer
>
> <https://www.ververica.com/>
>
> Follow us @VervericaData
>
> --
>
> Join Flink Forward <https://flink-forward.org/> - The Apache Flink
> Conference
>
> Stream Processing | Event Driven | Real Time
>
> --
>
> Ververica GmbH | Invalidenstrasse 115, 10115 Berlin, Germany
>
> --
> Ververica GmbH
> Registered at Amtsgericht Charlottenburg: HRB 158244 B
> Managing Directors: Timothy Alexander Steinert, Yip Park Tung Jason, Ji
> (Toni) Cheng
>


How to use Flink IDE

2020-08-30 Thread Piper Piper
Hi,

Till now, I have only been using Flink binaries. How do I setup Flink in my
IntelliJ IDE so that while running/debugging my Flink application program I
can also step into the Flink source code?

Do I first need to import Flink's source repository into my IDE and build
it?

Thanks,

Piper


Re: Ververica Flink training resources

2020-08-23 Thread Piper Piper
Hi David

1. Thank you for fixing the links!

2. I downloaded the repo and data files in the middle of the rewriting, so
the schema mentioned in the repo did not match the files. The new exercises
are running well but I could not adjust the servingspeedfactor to speed up
the serving of data events. I'm guessing this feature was removed in the
new repo.

Best,
Piper

On Sun, Aug 23, 2020 at 10:15 AM David Anderson 
wrote:

> Piper,
>
> 1. Thanks for reporting the problem with the broken links. I've just fixed
> this.
>
> 2. The exercises were recently rewritten so that they no longer use the
> old file-based datasets. Now they use data generators that are included in
> the project. As part of this update, the schema was modified slightly (so
> that the TaxiRide and TaxiFare types can be serialized with Flink's POJO
> serializer). Is this causing a problem?
>
> Best,
> David
>
> On Sun, Aug 23, 2020 at 12:20 AM Piper Piper  wrote:
>
>> Hi Flink community,
>>
>> I have two questions regarding the Ververica Flink Training resources.
>>
>> 1. In the official Flink documentation, the hyperlinks to the github
>> sites for the exercises in the "Learn Flink" section are not working. If
>> possible, please provide me with the correct links for the exercises.
>>
>> 2. The schema of the Taxi Fares dataset matches with the old dataset
>> (nycTaxiFares.gz). However, the schema of the Taxi Ride dataset given in
>> the Ververica github site does not seem to match the dataset in the old
>> file (nycTaxiRides.gz). Please advise.
>>
>> Given Schema: rideId, taxiId, driverId, isStart, startTime, endTime,
>> startLon, startLat, endLon, endLat, passengerCnt
>>
>> nycTaxiRides.gz sample line (after extracting to file
>> nycTaxiRides4): 6,START,2013-01-01 00:00:00,1970-01-01
>> 00:00:00,-73.866135,40.771091,-73.961334,40.764912,6,201306,201306
>>
>> Thank you!
>>
>> Piper
>>
>


Ververica Flink training resources

2020-08-22 Thread Piper Piper
Hi Flink community,

I have two questions regarding the Ververica Flink Training resources.

1. In the official Flink documentation, the hyperlinks to the github sites
for the exercises in the "Learn Flink" section are not working. If
possible, please provide me with the correct links for the exercises.

2. The schema of the Taxi Fares dataset matches with the old dataset
(nycTaxiFares.gz). However, the schema of the Taxi Ride dataset given in
the Ververica github site does not seem to match the dataset in the old
file (nycTaxiRides.gz). Please advise.

Given Schema: rideId, taxiId, driverId, isStart, startTime, endTime,
startLon, startLat, endLon, endLat, passengerCnt

nycTaxiRides.gz sample line (after extracting to file
nycTaxiRides4): 6,START,2013-01-01 00:00:00,1970-01-01
00:00:00,-73.866135,40.771091,-73.961334,40.764912,6,201306,201306

Thank you!

Piper


Re: Java implementations of Streaming applications for Flink

2020-02-25 Thread Piper Piper
Very informative. Thank you, Robert!

On Tue, Feb 25, 2020 at 5:02 AM Robert Metzger  wrote:

> Hey Piper,
>
> Here's an example for a more advanced Flink application:
> https://flink.apache.org/news/2020/01/15/demo-fraud-detection.html
> The Flink developers internally maintain a number of testing jobs. They
> are rather artificial, but you still might find some useful things here and
> there: https://github.com/apache/flink/tree/master/flink-end-to-end-tests
>
> Best,
> Robert
>
>
> On Mon, Feb 24, 2020 at 9:38 AM Piper Piper  wrote:
>
>> Hi all,
>>
>> The examples in the Flink github repo do not seem to include many
>> standard streaming applications compared to the batch examples.
>>
>> Where can I get standard (recommended) Java implementations of
>> “Streaming” applications for Flink, that are clearly: (1) CPU-intensive,
>> like streaming PageRank, streaming K-Means, etc., and (2) I/O-intensive,
>> and if possible also,  (3) Network intensive?
>>
>> By "standard", I mean Java implementations that are accepted by the
>> community in terms of efficiency.
>>
>> Thank you,
>>
>> Piper
>>
>


Java implementations of Streaming applications for Flink

2020-02-24 Thread Piper Piper
Hi all,

The examples in the Flink github repo do not seem to include many standard
streaming applications compared to the batch examples.

Where can I get standard (recommended) Java implementations of “Streaming”
applications for Flink, that are clearly: (1) CPU-intensive, like streaming
PageRank, streaming K-Means, etc., and (2) I/O-intensive, and if possible
also,  (3) Network intensive?

By "standard", I mean Java implementations that are accepted by the
community in terms of efficiency.

Thank you,

Piper


Re: Flink+YARN HDFS replication factor

2020-01-30 Thread Piper Piper
Please disregard my previous email. I found the answer online.

I thought writing data to local disk automatically meant the data would be
persisted to HDFS. However, Spark writes data (in between shuffles) to
local disk only.

Thanks

On Thu, Jan 30, 2020, 2:00 PM Piper Piper  wrote:

> Hi Till,
>
> Thank you for the information!
>
> In case of wide transformations, Spark stores input data onto disk between
> shuffles. So, I was wondering if Flink does that as well (even for windows
> of streaming data), and whether that "storing to disk" is persisted to the
> HDFS and honors the replication factor.
>
> Best,
>
> Pankaj
>
> On Wed, Jan 29, 2020 at 9:56 AM Till Rohrmann 
> wrote:
>
>> Hi Piper,
>>
>> in general, Flink does not store transient data such as event data on
>> HDFS. Event data (data which is sent between the TaskManager's to process
>> it) is only kept in memory and if becoming too big spilled by some
>> operators to local disk.
>>
>> What Flink stores on HDFS (given it is configured this way), is the state
>> data which is part of the jobs checkpoints. Moreover, Flink stores the job
>> information such as the JobGraph and the corresponding blobs (Jars and job
>> artifacts) on HDFS if configured so.
>>
>> Cheers,
>> Till
>>
>> On Wed, Jan 29, 2020 at 7:07 AM Piper Piper  wrote:
>>
>>> Hello,
>>>
>>> When using Flink+YARN (with HDFS) and having a long running Flink
>>> session (mode) cluster with a Flink client submitting jobs, the HDFS could
>>> have a replication factor greater than 1 (example 3).
>>>
>>> So, I would like to know when and how any of the data (like event-data
>>> or batch-data) or code (like JAR) in a Flink job is saved to the HDFS and
>>> is replicated in the entire YARN cluster of nodes?
>>>
>>> For example, in streaming applications, would all the event-data only be
>>> in memory (RAM) until it reaches the DAG's sink and then must be saved into
>>> HDFS?
>>>
>>> Thank you,
>>>
>>> Piper
>>>
>>


Re: Flink+YARN HDFS replication factor

2020-01-30 Thread Piper Piper
Hi Till,

Thank you for the information!

In case of wide transformations, Spark stores input data onto disk between
shuffles. So, I was wondering if Flink does that as well (even for windows
of streaming data), and whether that "storing to disk" is persisted to the
HDFS and honors the replication factor.

Best,

Pankaj

On Wed, Jan 29, 2020 at 9:56 AM Till Rohrmann  wrote:

> Hi Piper,
>
> in general, Flink does not store transient data such as event data on
> HDFS. Event data (data which is sent between the TaskManager's to process
> it) is only kept in memory and if becoming too big spilled by some
> operators to local disk.
>
> What Flink stores on HDFS (given it is configured this way), is the state
> data which is part of the jobs checkpoints. Moreover, Flink stores the job
> information such as the JobGraph and the corresponding blobs (Jars and job
> artifacts) on HDFS if configured so.
>
> Cheers,
> Till
>
> On Wed, Jan 29, 2020 at 7:07 AM Piper Piper  wrote:
>
>> Hello,
>>
>> When using Flink+YARN (with HDFS) and having a long running Flink session
>> (mode) cluster with a Flink client submitting jobs, the HDFS could have a
>> replication factor greater than 1 (example 3).
>>
>> So, I would like to know when and how any of the data (like event-data or
>> batch-data) or code (like JAR) in a Flink job is saved to the HDFS and is
>> replicated in the entire YARN cluster of nodes?
>>
>> For example, in streaming applications, would all the event-data only be
>> in memory (RAM) until it reaches the DAG's sink and then must be saved into
>> HDFS?
>>
>> Thank you,
>>
>> Piper
>>
>


Flink+YARN HDFS replication factor

2020-01-28 Thread Piper Piper
Hello,

When using Flink+YARN (with HDFS) and having a long running Flink session
(mode) cluster with a Flink client submitting jobs, the HDFS could have a
replication factor greater than 1 (example 3).

So, I would like to know when and how any of the data (like event-data or
batch-data) or code (like JAR) in a Flink job is saved to the HDFS and is
replicated in the entire YARN cluster of nodes?

For example, in streaming applications, would all the event-data only be in
memory (RAM) until it reaches the DAG's sink and then must be saved into
HDFS?

Thank you,

Piper


Re: Flink on YARN: Where to install Flink binaries?

2019-12-04 Thread Piper Piper
Thank you, Till!

On Wed, Dec 4, 2019, 5:51 AM Till Rohrmann  wrote:

> Hi Piper,
>
> Answer 1: You should pick the Scala version you are using in your user
> program. If you don't use Scala at all, then pick 2.11.
> Answer 2: Flink does not need to be installed on the Yarn nodes. The
> client is the machine from which you start the Flink cluster. The client
> machine needs to have access to the Hadoop/Yarn cluster. Hence you should
> configure the HADOOP_CONF_DIR to the Hadoop configuration.
>
> Cheers,
> Till
>
> On Wed, Dec 4, 2019 at 11:04 AM Piper Piper  wrote:
>
>> Hello,
>>
>> I have a YARN/Hadoop 2.7.6 cluster, on which I plan to run Flink in Job
>> mode using:
>> Flink 1.9.1 (with Flink application programs written in Java)
>> Prebundled Hadoop 2.7.5
>>
>> Question 1: Which scala version must I choose for the Flink 1.9.1 binary
>> (2.11 or 2.12)?
>>
>> Secondly, I had read a document or mailing list question (which I have
>> now lost access to), that the Flink binaries do not need to be installed on
>> any of the YARN cluster nodes. Instead, the Flink binaries must only be
>> installed on the client which submits the Flink job to the YARN cluster.
>>
>> Question 2: Can someone please confirm and clarify the above point for
>> me? What is this client?
>>
>> 1. Can the client be one of the YARN cluster nodes (NameNode,
>> ResourceManager Node or Worker nodes)?
>>
>> 2. Can the client be a remote desktop (not a part of the YARN cluster)?
>>
>> Question 3: How do I get the value used to set the YARN_CONF_DIR or
>> HADOOP_CONF_DIR environment variable on a remote desktop client?
>>
>> Thanks,
>>
>> Piper
>>
>


Flink on YARN: Where to install Flink binaries?

2019-12-04 Thread Piper Piper
Hello,

I have a YARN/Hadoop 2.7.6 cluster, on which I plan to run Flink in Job
mode using:
Flink 1.9.1 (with Flink application programs written in Java)
Prebundled Hadoop 2.7.5

Question 1: Which scala version must I choose for the Flink 1.9.1 binary
(2.11 or 2.12)?

Secondly, I had read a document or mailing list question (which I have now
lost access to), that the Flink binaries do not need to be installed on any
of the YARN cluster nodes. Instead, the Flink binaries must only be
installed on the client which submits the Flink job to the YARN cluster.

Question 2: Can someone please confirm and clarify the above point for me?
What is this client?

1. Can the client be one of the YARN cluster nodes (NameNode,
ResourceManager Node or Worker nodes)?

2. Can the client be a remote desktop (not a part of the YARN cluster)?

Question 3: How do I get the value used to set the YARN_CONF_DIR or
HADOOP_CONF_DIR environment variable on a remote desktop client?

Thanks,

Piper


Re: Dynamically creating new Task Managers in YARN

2019-11-25 Thread Piper Piper
Hi Yang,

Session mode is working exactly as you described. No exceptions.

Thank you!

Piper


On Sun, Nov 24, 2019 at 11:24 PM Yang Wang  wrote:

> Hi Piper,
>
> In session mode, Flink will always use the free slots in the existing
> TaskManagers first.
> When it can not full fill the slot request, new TaskManagers will be
> started.
> Did you find some exceptions?
>
> Best,
> Yang
>
> Piper Piper  于2019年11月23日周六 上午8:52写道:
>
>> Hello Yang,
>>
>> Thank you for the explanation!
>>
>> I want to control the amount of TaskManagers in order to have finer
>> control over allowing/rejecting certain jobs in the cluster.
>>
>> In Session mode with multiple jobs, is there any way to control whether
>> Flink will fit a new job into empty slots in existing Task Managers versus
>> starting new TaskManagers for every new job?
>>
>> Thank you,
>>
>> Piper
>>
>> On Thu, Nov 21, 2019 at 10:53 PM Yang Wang  wrote:
>>
>>> Hi Piper,
>>>
>>> Jingsong is right. Both per-job and session cluster, the
>>> YarnResourceManager will allocate
>>> taskmanager containers dynamically on demand.
>>>
>>> For per-job cluster, it will allocate taskmanagers base on the job slot
>>> demand. The excess
>>> containers will return to yarn immediately. When the job finished,
>>> jobmanager and all
>>> taskmanagers will be released.
>>> For sesion-cluster, the YarnResourceManager will not have any
>>> taskmanagers on started.
>>> Once the job is submitted, it will allocate the taskmanagers. When the
>>> job finished, the
>>> taskmanagers will enter into idle and be released after the timeout. The
>>> jobmanager will
>>> be long-running unless manually stop the session.
>>>
>>> I'm just curious why do you want to control the amounts of taskmanagers.
>>> Because they are
>>> always allocated on demand.
>>>
>>>
>>> Best,
>>> Yang
>>>
>>> Piper Piper  于2019年11月22日周五 上午11:02写道:
>>>
>>>> Thank you, I will check it out.
>>>>
>>>> On Thu, Nov 21, 2019, 9:21 PM Jingsong Li 
>>>> wrote:
>>>>
>>>>> Hi Piper,
>>>>>
>>>>> AFAIK, There are no these flexible operations. You can get some
>>>>> information from metrics, but you can not control them.
>>>>> Maybe you should modify some source code in flink-yarn.
>>>>>
>>>>> Best,
>>>>> Jingsong Lee
>>>>>
>>>>>
>>>>> On Thu, Nov 21, 2019 at 8:17 PM Piper Piper 
>>>>> wrote:
>>>>>
>>>>>> Hi Jingsong,
>>>>>>
>>>>>> Thank you for your reply!
>>>>>>
>>>>>> >Is this what you want? Piper.
>>>>>>
>>>>>> Yes. This is exactly what I want.
>>>>>>
>>>>>> Is there any way for me to specify to Flink RM how much of resources
>>>>>> to ask YARN's RM for, and if we want Flink's RM to ask for resources
>>>>>> proactively before it runs out?
>>>>>> Similarly, is there any way I can force the JM to release TM back to
>>>>>> YARN before timeout?
>>>>>>
>>>>>> Or will I need to modify the source code of Flink for this?
>>>>>>
>>>>>> Thank you,
>>>>>>
>>>>>> Piper
>>>>>>
>>>>>> On Thu, Nov 21, 2019 at 2:17 AM vino yang 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Jingsong,
>>>>>>>
>>>>>>> Thanks for the explanation about the mechanism of the new Flink
>>>>>>> session cluster mode.
>>>>>>>
>>>>>>> Because I mostly use job cluster mode, so did not have a good
>>>>>>> knowledge of the new Flink session cluster mode.
>>>>>>>
>>>>>>> Best,
>>>>>>> Vino
>>>>>>>
>>>>>>> Jingsong Li  于2019年11月21日周四 下午2:46写道:
>>>>>>>
>>>>>>>> Hi Piper and Vino:
>>>>>>>>
>>>>>>>> Current Flink version, the resources of Flink Session cluster
>>>>>>>> are unrestricted, which means if the requested resources exceed the
>>>>>>>> resourc

Re: Dynamically creating new Task Managers in YARN

2019-11-22 Thread Piper Piper
Hello Yang,

Thank you for the explanation!

I want to control the amount of TaskManagers in order to have finer control
over allowing/rejecting certain jobs in the cluster.

In Session mode with multiple jobs, is there any way to control whether
Flink will fit a new job into empty slots in existing Task Managers versus
starting new TaskManagers for every new job?

Thank you,

Piper

On Thu, Nov 21, 2019 at 10:53 PM Yang Wang  wrote:

> Hi Piper,
>
> Jingsong is right. Both per-job and session cluster, the
> YarnResourceManager will allocate
> taskmanager containers dynamically on demand.
>
> For per-job cluster, it will allocate taskmanagers base on the job slot
> demand. The excess
> containers will return to yarn immediately. When the job finished,
> jobmanager and all
> taskmanagers will be released.
> For sesion-cluster, the YarnResourceManager will not have any taskmanagers
> on started.
> Once the job is submitted, it will allocate the taskmanagers. When the job
> finished, the
> taskmanagers will enter into idle and be released after the timeout. The
> jobmanager will
> be long-running unless manually stop the session.
>
> I'm just curious why do you want to control the amounts of taskmanagers.
> Because they are
> always allocated on demand.
>
>
> Best,
> Yang
>
> Piper Piper  于2019年11月22日周五 上午11:02写道:
>
>> Thank you, I will check it out.
>>
>> On Thu, Nov 21, 2019, 9:21 PM Jingsong Li  wrote:
>>
>>> Hi Piper,
>>>
>>> AFAIK, There are no these flexible operations. You can get some
>>> information from metrics, but you can not control them.
>>> Maybe you should modify some source code in flink-yarn.
>>>
>>> Best,
>>> Jingsong Lee
>>>
>>>
>>> On Thu, Nov 21, 2019 at 8:17 PM Piper Piper 
>>> wrote:
>>>
>>>> Hi Jingsong,
>>>>
>>>> Thank you for your reply!
>>>>
>>>> >Is this what you want? Piper.
>>>>
>>>> Yes. This is exactly what I want.
>>>>
>>>> Is there any way for me to specify to Flink RM how much of resources to
>>>> ask YARN's RM for, and if we want Flink's RM to ask for resources
>>>> proactively before it runs out?
>>>> Similarly, is there any way I can force the JM to release TM back to
>>>> YARN before timeout?
>>>>
>>>> Or will I need to modify the source code of Flink for this?
>>>>
>>>> Thank you,
>>>>
>>>> Piper
>>>>
>>>> On Thu, Nov 21, 2019 at 2:17 AM vino yang 
>>>> wrote:
>>>>
>>>>> Hi Jingsong,
>>>>>
>>>>> Thanks for the explanation about the mechanism of the new Flink
>>>>> session cluster mode.
>>>>>
>>>>> Because I mostly use job cluster mode, so did not have a good
>>>>> knowledge of the new Flink session cluster mode.
>>>>>
>>>>> Best,
>>>>> Vino
>>>>>
>>>>> Jingsong Li  于2019年11月21日周四 下午2:46写道:
>>>>>
>>>>>> Hi Piper and Vino:
>>>>>>
>>>>>> Current Flink version, the resources of Flink Session cluster
>>>>>> are unrestricted, which means if the requested resources exceed the
>>>>>> resources owned by the current session, it will apply to the RM of yarn 
>>>>>> for
>>>>>> new resources.
>>>>>> And if TaskManager is idle for too long, JM will release it to yarn.
>>>>>> This behavior is controlled by resourcemanager.taskmanager-timeout . You
>>>>>> can set a suitable value for it to enjoy the benefits of reuse process 
>>>>>> and
>>>>>> dynamic resources.
>>>>>>
>>>>>> From this point of view, I think session mode is a good choice.
>>>>>> Is this what you want? Piper.
>>>>>>
>>>>>> Best,
>>>>>> Jingsong Lee
>>>>>>
>>>>>>
>>>>>>
>>>>>> On Thu, Nov 21, 2019 at 2:25 PM vino yang 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Piper,
>>>>>>>
>>>>>>> The understanding of two deploy modes For Flink on Yarn is right.
>>>>>>>
>>>>>>> AFAIK, The single job (job cluster) mode is more popular than
>>>>>>> Session mode.
>>>>>>>
&

Re: Metrics for Task States

2019-11-22 Thread Piper Piper
I am trying to reason why this problem should occur (i.e. why Flink could
not reject the job when it required more slots than were available).

Flink in production on EMR (YARN): Does this mean Flink was being run in
Job mode or Session mode?

Thank you,

Piper

On Thu, Nov 21, 2019 at 4:56 PM Piper Piper  wrote:

> Thank you, Kelly!
>
> On Thu, Nov 21, 2019 at 4:06 PM Kelly Smith 
> wrote:
>
>> Hi Piper,
>>
>>
>>
>> The repro is pretty simple:
>>
>>- Submit a job with parallelism set higher than YARN has resources to
>>support
>>
>>
>>
>> What this ends up looking like in the Flink UI is this:
>>
>>
>>
>> The Job is in a “RUNNING” state, but all of the tasks are in the
>> “SCHEDULED” state. The `jobmanager.numRunningJobs` metric that Flink emits
>> by default will increase by 1, but none of the tasks actually get scheduled
>> on any TM.
>>
>>
>>
>>
>>
>> What I’m looking for is a way to detect when I am in this state using
>> Flink metrics (ideally the count of tasks in each state for better
>> observability).
>>
>>
>>
>> Does that make sense?
>>
>>
>>
>> Thanks,
>>
>> Kelly
>>
>>
>>
>> *From: *Piper Piper 
>> *Date: *Thursday, November 21, 2019 at 12:59 PM
>> *To: *Kelly Smith 
>> *Cc: *"user@flink.apache.org" 
>> *Subject: *Re: Metrics for Task States
>>
>>
>>
>> Hello Kelly,
>>
>>
>>
>> I thought that Flink scheduler only starts a job if all requested
>> containers/TMs are available and allotted to that job.
>>
>>
>>
>> How can I reproduce your issue on Flink with YARN?
>>
>>
>>
>> Thank you,
>>
>>
>>
>> Piper
>>
>>
>>
>>
>>
>> On Thu, Nov 21, 2019, 1:48 PM Kelly Smith 
>> wrote:
>>
>> I’ve been running Flink in production on EMR (YARN) for some time and
>> have found the metrics system to be quite useful, but there is one specific
>> case where I’m missing a signal for this scenario:
>>
>>
>>
>>- When a job has been submitted, but YARN does not have enough
>>resources to provide
>>
>>
>>
>> Observed:
>>
>>- Job is in RUNNING state
>>- All of the tasks for the job are in the (I believe) DEPLOYING state
>>
>>
>>
>> Is there a way to access these as metrics for monitoring the number of
>> tasks in each state for a given job (image below)? The metric I’m currently
>> using is the number of running jobs, but it misses this “unhealthy”
>> scenario. I realize that I could use application-level metrics (record
>> counts, etc) as a proxy for this, but I’m working on providing a streaming
>> platform and need all of my monitoring to be application agnostic.
>>
>> [image: cid:image001.png@01D5A059.19DB3EB0]
>>
>>
>>
>> I can’t find anything on it in the documentation.
>>
>>
>>
>> Thanks,
>>
>> Kelly
>>
>>


Re: Dynamically creating new Task Managers in YARN

2019-11-21 Thread Piper Piper
Thank you, I will check it out.

On Thu, Nov 21, 2019, 9:21 PM Jingsong Li  wrote:

> Hi Piper,
>
> AFAIK, There are no these flexible operations. You can get some
> information from metrics, but you can not control them.
> Maybe you should modify some source code in flink-yarn.
>
> Best,
> Jingsong Lee
>
>
> On Thu, Nov 21, 2019 at 8:17 PM Piper Piper  wrote:
>
>> Hi Jingsong,
>>
>> Thank you for your reply!
>>
>> >Is this what you want? Piper.
>>
>> Yes. This is exactly what I want.
>>
>> Is there any way for me to specify to Flink RM how much of resources to
>> ask YARN's RM for, and if we want Flink's RM to ask for resources
>> proactively before it runs out?
>> Similarly, is there any way I can force the JM to release TM back to YARN
>> before timeout?
>>
>> Or will I need to modify the source code of Flink for this?
>>
>> Thank you,
>>
>> Piper
>>
>> On Thu, Nov 21, 2019 at 2:17 AM vino yang  wrote:
>>
>>> Hi Jingsong,
>>>
>>> Thanks for the explanation about the mechanism of the new Flink session
>>> cluster mode.
>>>
>>> Because I mostly use job cluster mode, so did not have a good knowledge
>>> of the new Flink session cluster mode.
>>>
>>> Best,
>>> Vino
>>>
>>> Jingsong Li  于2019年11月21日周四 下午2:46写道:
>>>
>>>> Hi Piper and Vino:
>>>>
>>>> Current Flink version, the resources of Flink Session cluster
>>>> are unrestricted, which means if the requested resources exceed the
>>>> resources owned by the current session, it will apply to the RM of yarn for
>>>> new resources.
>>>> And if TaskManager is idle for too long, JM will release it to yarn.
>>>> This behavior is controlled by resourcemanager.taskmanager-timeout . You
>>>> can set a suitable value for it to enjoy the benefits of reuse process and
>>>> dynamic resources.
>>>>
>>>> From this point of view, I think session mode is a good choice.
>>>> Is this what you want? Piper.
>>>>
>>>> Best,
>>>> Jingsong Lee
>>>>
>>>>
>>>>
>>>> On Thu, Nov 21, 2019 at 2:25 PM vino yang 
>>>> wrote:
>>>>
>>>>> Hi Piper,
>>>>>
>>>>> The understanding of two deploy modes For Flink on Yarn is right.
>>>>>
>>>>> AFAIK, The single job (job cluster) mode is more popular than Session
>>>>> mode.
>>>>>
>>>>> Because job cluster mode, Flink let YARN manage resources as far as
>>>>> possible. And this mode can keep isolation from other jobs.
>>>>>
>>>>> IMO, we do not need to combine their advantages. Let YARN do the
>>>>> things that it is good at. What do you think?
>>>>>
>>>>> Best,
>>>>> Vino
>>>>>
>>>>>
>>>>> Piper Piper  于2019年11月21日周四 上午11:55写道:
>>>>>
>>>>>> Hi Vino,
>>>>>>
>>>>>> I want to implement Resource Elasticity. In doing so, I have read
>>>>>> that Flink with YARN has two modes: Job and Session.
>>>>>>
>>>>>> In Job mode, Flink’s Resource Manager requests YARN for containers
>>>>>> with TMs, and then gives the containers back to YARN upon job completion.
>>>>>>
>>>>>> In Session mode, Flink already has the TMs that are persistent.
>>>>>>
>>>>>> I want to combine the advantages of Job and Session mode, i.e. Flink
>>>>>> will have persistent TMs/containers and request YARN for more
>>>>>> TMs/containers when needed (or release TMs/containers back to YARN).
>>>>>>
>>>>>> Thank you,
>>>>>>
>>>>>> Piper
>>>>>>
>>>>>> On Wed, Nov 20, 2019 at 9:39 PM vino yang 
>>>>>> wrote:
>>>>>>
>>>>>>> Hi Piper,
>>>>>>>
>>>>>>> Can you share more reason and details of your requirements.
>>>>>>>
>>>>>>> Best,
>>>>>>> Vino
>>>>>>>
>>>>>>> Piper Piper  于2019年11月21日周四 上午5:48写道:
>>>>>>>
>>>>>>>> Hi,
>>>>>>>>
>>>>>>>> How can I make Flink's Resource Manager request YARN to spin up new
>>>>>>>> (or destroy/reclaim existing) TaskManagers in YARN containers?
>>>>>>>>
>>>>>>>> Preferably at runtime (i.e. dynamically).
>>>>>>>>
>>>>>>>> Thank you
>>>>>>>>
>>>>>>>> Piper
>>>>>>>>
>>>>>>>
>>>>
>>>> --
>>>> Best, Jingsong Lee
>>>>
>>>
>
> --
> Best, Jingsong Lee
>


Re: Metrics for Task States

2019-11-21 Thread Piper Piper
Thank you, Kelly!

On Thu, Nov 21, 2019 at 4:06 PM Kelly Smith  wrote:

> Hi Piper,
>
>
>
> The repro is pretty simple:
>
>- Submit a job with parallelism set higher than YARN has resources to
>support
>
>
>
> What this ends up looking like in the Flink UI is this:
>
>
>
> The Job is in a “RUNNING” state, but all of the tasks are in the
> “SCHEDULED” state. The `jobmanager.numRunningJobs` metric that Flink emits
> by default will increase by 1, but none of the tasks actually get scheduled
> on any TM.
>
>
>
>
>
> What I’m looking for is a way to detect when I am in this state using
> Flink metrics (ideally the count of tasks in each state for better
> observability).
>
>
>
> Does that make sense?
>
>
>
> Thanks,
>
> Kelly
>
>
>
> *From: *Piper Piper 
> *Date: *Thursday, November 21, 2019 at 12:59 PM
> *To: *Kelly Smith 
> *Cc: *"user@flink.apache.org" 
> *Subject: *Re: Metrics for Task States
>
>
>
> Hello Kelly,
>
>
>
> I thought that Flink scheduler only starts a job if all requested
> containers/TMs are available and allotted to that job.
>
>
>
> How can I reproduce your issue on Flink with YARN?
>
>
>
> Thank you,
>
>
>
> Piper
>
>
>
>
>
> On Thu, Nov 21, 2019, 1:48 PM Kelly Smith  wrote:
>
> I’ve been running Flink in production on EMR (YARN) for some time and have
> found the metrics system to be quite useful, but there is one specific case
> where I’m missing a signal for this scenario:
>
>
>
>- When a job has been submitted, but YARN does not have enough
>resources to provide
>
>
>
> Observed:
>
>- Job is in RUNNING state
>- All of the tasks for the job are in the (I believe) DEPLOYING state
>
>
>
> Is there a way to access these as metrics for monitoring the number of
> tasks in each state for a given job (image below)? The metric I’m currently
> using is the number of running jobs, but it misses this “unhealthy”
> scenario. I realize that I could use application-level metrics (record
> counts, etc) as a proxy for this, but I’m working on providing a streaming
> platform and need all of my monitoring to be application agnostic.
>
> [image: cid:image001.png@01D5A059.19DB3EB0]
>
>
>
> I can’t find anything on it in the documentation.
>
>
>
> Thanks,
>
> Kelly
>
>


Re: Metrics for Task States

2019-11-21 Thread Piper Piper
Hello Kelly,

I thought that Flink scheduler only starts a job if all requested
containers/TMs are available and allotted to that job.

How can I reproduce your issue on Flink with YARN?

Thank you,

Piper


On Thu, Nov 21, 2019, 1:48 PM Kelly Smith  wrote:

> I’ve been running Flink in production on EMR (YARN) for some time and have
> found the metrics system to be quite useful, but there is one specific case
> where I’m missing a signal for this scenario:
>
>
>
>- When a job has been submitted, but YARN does not have enough
>resources to provide
>
>
>
> Observed:
>
>- Job is in RUNNING state
>- All of the tasks for the job are in the (I believe) DEPLOYING state
>
>
>
> Is there a way to access these as metrics for monitoring the number of
> tasks in each state for a given job (image below)? The metric I’m currently
> using is the number of running jobs, but it misses this “unhealthy”
> scenario. I realize that I could use application-level metrics (record
> counts, etc) as a proxy for this, but I’m working on providing a streaming
> platform and need all of my monitoring to be application agnostic.
>
>
>
> I can’t find anything on it in the documentation.
>
>
>
> Thanks,
>
> Kelly
>


Re: Dynamically creating new Task Managers in YARN

2019-11-21 Thread Piper Piper
Hi Jingsong,

Thank you for your reply!

>Is this what you want? Piper.

Yes. This is exactly what I want.

Is there any way for me to specify to Flink RM how much of resources to ask
YARN's RM for, and if we want Flink's RM to ask for resources proactively
before it runs out?
Similarly, is there any way I can force the JM to release TM back to YARN
before timeout?

Or will I need to modify the source code of Flink for this?

Thank you,

Piper

On Thu, Nov 21, 2019 at 2:17 AM vino yang  wrote:

> Hi Jingsong,
>
> Thanks for the explanation about the mechanism of the new Flink session
> cluster mode.
>
> Because I mostly use job cluster mode, so did not have a good knowledge of
> the new Flink session cluster mode.
>
> Best,
> Vino
>
> Jingsong Li  于2019年11月21日周四 下午2:46写道:
>
>> Hi Piper and Vino:
>>
>> Current Flink version, the resources of Flink Session cluster
>> are unrestricted, which means if the requested resources exceed the
>> resources owned by the current session, it will apply to the RM of yarn for
>> new resources.
>> And if TaskManager is idle for too long, JM will release it to yarn. This
>> behavior is controlled by resourcemanager.taskmanager-timeout . You can set
>> a suitable value for it to enjoy the benefits of reuse process and dynamic
>> resources.
>>
>> From this point of view, I think session mode is a good choice.
>> Is this what you want? Piper.
>>
>> Best,
>> Jingsong Lee
>>
>>
>>
>> On Thu, Nov 21, 2019 at 2:25 PM vino yang  wrote:
>>
>>> Hi Piper,
>>>
>>> The understanding of two deploy modes For Flink on Yarn is right.
>>>
>>> AFAIK, The single job (job cluster) mode is more popular than Session
>>> mode.
>>>
>>> Because job cluster mode, Flink let YARN manage resources as far as
>>> possible. And this mode can keep isolation from other jobs.
>>>
>>> IMO, we do not need to combine their advantages. Let YARN do the things
>>> that it is good at. What do you think?
>>>
>>> Best,
>>> Vino
>>>
>>>
>>> Piper Piper  于2019年11月21日周四 上午11:55写道:
>>>
>>>> Hi Vino,
>>>>
>>>> I want to implement Resource Elasticity. In doing so, I have read that
>>>> Flink with YARN has two modes: Job and Session.
>>>>
>>>> In Job mode, Flink’s Resource Manager requests YARN for containers with
>>>> TMs, and then gives the containers back to YARN upon job completion.
>>>>
>>>> In Session mode, Flink already has the TMs that are persistent.
>>>>
>>>> I want to combine the advantages of Job and Session mode, i.e. Flink
>>>> will have persistent TMs/containers and request YARN for more
>>>> TMs/containers when needed (or release TMs/containers back to YARN).
>>>>
>>>> Thank you,
>>>>
>>>> Piper
>>>>
>>>> On Wed, Nov 20, 2019 at 9:39 PM vino yang 
>>>> wrote:
>>>>
>>>>> Hi Piper,
>>>>>
>>>>> Can you share more reason and details of your requirements.
>>>>>
>>>>> Best,
>>>>> Vino
>>>>>
>>>>> Piper Piper  于2019年11月21日周四 上午5:48写道:
>>>>>
>>>>>> Hi,
>>>>>>
>>>>>> How can I make Flink's Resource Manager request YARN to spin up new
>>>>>> (or destroy/reclaim existing) TaskManagers in YARN containers?
>>>>>>
>>>>>> Preferably at runtime (i.e. dynamically).
>>>>>>
>>>>>> Thank you
>>>>>>
>>>>>> Piper
>>>>>>
>>>>>
>>
>> --
>> Best, Jingsong Lee
>>
>


Re: Dynamically creating new Task Managers in YARN

2019-11-20 Thread Piper Piper
Hi Vino,

I want to implement Resource Elasticity. In doing so, I have read that
Flink with YARN has two modes: Job and Session.

In Job mode, Flink’s Resource Manager requests YARN for containers with
TMs, and then gives the containers back to YARN upon job completion.

In Session mode, Flink already has the TMs that are persistent.

I want to combine the advantages of Job and Session mode, i.e. Flink will
have persistent TMs/containers and request YARN for more TMs/containers
when needed (or release TMs/containers back to YARN).

Thank you,

Piper

On Wed, Nov 20, 2019 at 9:39 PM vino yang  wrote:

> Hi Piper,
>
> Can you share more reason and details of your requirements.
>
> Best,
> Vino
>
> Piper Piper  于2019年11月21日周四 上午5:48写道:
>
>> Hi,
>>
>> How can I make Flink's Resource Manager request YARN to spin up new (or
>> destroy/reclaim existing) TaskManagers in YARN containers?
>>
>> Preferably at runtime (i.e. dynamically).
>>
>> Thank you
>>
>> Piper
>>
>


Dynamically creating new Task Managers in YARN

2019-11-20 Thread Piper Piper
Hi,

How can I make Flink's Resource Manager request YARN to spin up new (or
destroy/reclaim existing) TaskManagers in YARN containers?

Preferably at runtime (i.e. dynamically).

Thank you

Piper