Re: [Flink::Test] access registered accumulators via harness

2020-10-30 Thread Sharipov, Rinat
Hi Dawid, thx for your reply and sorry for a question with a double
interpretation !

You understood me correctly, I would like to get counters value by their
names after completing all operations with the harness component.

I suppose that it should be useful because most of our functions are
implementations of Flink functions and create counters in open phase.

I'm expecting that the following API can be useful in
AbstractStreamOperatorTestHarness

:

public static class AbstractStreamOperatorTestHarness  implements
AutoCloseable
{

  // that will give us an access to whole registered counters, metric
sub-groups, gauges, etc
  public MetricGroup getMetricGroup() {
return environment.getMetricGroup();
  }
}

Here is an example of usage, based on your example

private static class MyProcessFunction extends ProcessFunction {

private Counter myCustomCounter1;
private Counter myCustomCounter2;

@Override
public void open(Configuration parameters) throws Exception {
this.myCustomCounter1 =
getRuntimeContext().getMetricGroup().counter("myCustomCounter1");
this.myCustomCounter2 =
getRuntimeContext().getMetricGroup().counter("myCustomCounter2");
}

@Override
public void processElement(IN value, Context ctx, Collector
out) throws Exception {
if (checkCase1(value)) {
   myCustomCounter1.inc();
   return;
}
if (checkCase2(value)) {
   myCustomCounter2.inc();
   return;
}

out.collect(logic.doMyBusinessLogic(value));
}
}

public static class TestMyProcessFunction {
   public void processElement_should_incCounter1() {
  ProcessFunctionTestHarness harness = ...;
  harness.processElement(element);

assertThat(harness.getMetricGroup().counter("myCustomCounter1").getCount(),
equalTo(1));

assertThat(harness.getMetricGroup().counter("myCustomCounter2").getCount(),
equalTo(0));
   }
}



What do you think about such a harness API proposal ?

Thx !

пт, 30 окт. 2020 г. в 12:54, Dawid Wysakowicz :

> Hi Rinat,
>
> First of all, sorry for some nitpicking in the beginning, but your message
> might be a bit misleading for some. If I understood your message correctly
> you are referring to Metrics, not accumulators, which are a different
> concept[1]. Or were you indeed referring to accumulators?
>
> Now on to the topic of accessing metrics. Personally I don't think it is a
> right way for testing by exposing metrics somehow in the
> ProcessFunctionTestHarness. The harness should primarily be used as a
> minimal execution environment for testing operators and such behaviours as
> e.g. checkpointing. I would not recommend using it for testing business
> logic and most definitely metrics. I'd either test that in an IT test using
> a MiniCluster and a metric reporter you can assert or I'd separate the
> business logic from the setup logic. Something like:
> private static class MyProcessFunction extends
> ProcessFunction {
>
> private MyLogic logic;
>
> @Override
> public void open(Configuration parameters) throws Exception {
> super.open(parameters);
> this.logic = new
> MyLogic<>(getRuntimeContext().getMetricGroup().counter("my-counter"));
> }
>
> @Override
> public void processElement(IN value, Context ctx, Collector
> out) throws Exception {
> out.collect(logic.doMyBusinessLogic(value));
> }
> }
>
> private static class MyLogic {
>
> private final Counter counter;
>
> public MyLogic(Counter counter) {
> this.counter = counter;
> }
>
> public OUT doMyBusinessLogic(IN value) {
> // do the processing
> }
>
> }
>
> That way you can easily test your MyLogic class including interactions
> with the counter, by passing a mock counter.
>
> Best,
>
> Dawid
>
>
> [1]
> https://ci.apache.org/projects/flink/flink-docs-master/dev/user_defined_functions.html#accumulators--counters
> On 27/10/2020 08:02, Sharipov, Rinat wrote:
>
> Hi mates !
>
> I guess that I'm doing something wrong, but I couldn't find a way to
> access registered accumulators and their values via
> *org.apache.flink.streaming.util.**ProcessFunctionTestHarness *function
> wrapper that I'm using to test my functions.
>
> During the code research I've found, that required data is stored in
> *org.apache.flink.runtime.metrics.groups.AbstractMetricGroup#*metrics field,
> that is private and is not accessible from tests. It's obvious that Flink
> somehow accesses this field and exposes counters into it's Web UI.
>
> So I guess that someone can help me to add a check into my Unit Tests for
> metrics counting 

Re: [Flink::Test] access registered accumulators via harness

2020-10-30 Thread Dawid Wysakowicz
Hi Rinat,

First of all, sorry for some nitpicking in the beginning, but your
message might be a bit misleading for some. If I understood your message
correctly you are referring to Metrics, not accumulators, which are a
different concept[1]. Or were you indeed referring to accumulators?

Now on to the topic of accessing metrics. Personally I don't think it is
a right way for testing by exposing metrics somehow in the
ProcessFunctionTestHarness. The harness should primarily be used as a
minimal execution environment for testing operators and such behaviours
as e.g. checkpointing. I would not recommend using it for testing
business logic and most definitely metrics. I'd either test that in an
IT test using a MiniCluster and a metric reporter you can assert or I'd
separate the business logic from the setup logic. Something like:

|    private static class MyProcessFunction extends
ProcessFunction {||
||
||        private MyLogic logic;||
||
||        @Override||
||        public void open(Configuration parameters) throws Exception {||
||            super.open(parameters);||
||            this.logic = new
MyLogic<>(getRuntimeContext().getMetricGroup().counter("my-counter"));||
||        }||
||
||        @Override||
||        public void processElement(IN value, Context ctx,
Collector out) throws Exception {||
||            out.collect(logic.doMyBusinessLogic(value));||
||        }||
||    }||
||
||    private static class MyLogic {||
||        ||
||        private final Counter counter;||
||        ||
||        public MyLogic(Counter counter) {||
||            this.counter = counter;||
||        }||
||
||        public OUT doMyBusinessLogic(IN value) {||
||            // do the processing||
||        }||
|

|    }|

That way you can easily test your MyLogic class including interactions
with the counter, by passing a mock counter.

Best,

Dawid


[1]
https://ci.apache.org/projects/flink/flink-docs-master/dev/user_defined_functions.html#accumulators--counters

On 27/10/2020 08:02, Sharipov, Rinat wrote:
> Hi mates !
>
> I guess that I'm doing something wrong, but I couldn't find a way to
> access registered accumulators and their values
> via /*org.apache.flink.streaming.util.*/*ProcessFunctionTestHarness
> *function wrapper that I'm using to test my functions.
>
> During the code research I've found, that required data is stored
> in */org.apache.flink.runtime.metrics.groups.AbstractMetricGroup#/*metrics
> field, that is private and is not accessible from tests. It's obvious
> that Flink somehow accesses this field and exposes counters into it's
> Web UI.
>
> So I guess that someone can help me to add a check into my Unit Tests
> for metrics counting or in case if there is no such ability I'm ready
> to help to implement it if the community considers this acceptable.
>
> Thx !
>
>
>
>
>
>
>


signature.asc
Description: OpenPGP digital signature


[Flink::Test] access registered accumulators via harness

2020-10-27 Thread Sharipov, Rinat
Hi mates !

I guess that I'm doing something wrong, but I couldn't find a way to access
registered accumulators and their values via
*org.apache.flink.streaming.util.**ProcessFunctionTestHarness *function
wrapper that I'm using to test my functions.

During the code research I've found, that required data is stored in
*org.apache.flink.runtime.metrics.groups.AbstractMetricGroup#*metrics field,
that is private and is not accessible from tests. It's obvious that Flink
somehow accesses this field and exposes counters into it's Web UI.

So I guess that someone can help me to add a check into my Unit Tests for
metrics counting or in case if there is no such ability I'm ready to help
to implement it if the community considers this acceptable.

Thx !