[FLINK-4920] Introduce Scala Function Gauge This closes #3080.
Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/570dbc8d Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/570dbc8d Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/570dbc8d Branch: refs/heads/master Commit: 570dbc8d2597ee9688579f399b8743636e70f891 Parents: b36b43b Author: heytitle <pat.chor...@gmail.com> Authored: Tue Dec 27 23:21:19 2016 +0100 Committer: zentol <ches...@apache.org> Committed: Thu Jan 19 23:57:22 2017 +0100 ---------------------------------------------------------------------- docs/monitoring/metrics.md | 22 ++++++++++++ .../flink/api/scala/metrics/ScalaGauge.scala | 34 ++++++++++++++++++ .../api/scala/metrics/ScalaGaugeTest.scala | 36 ++++++++++++++++++++ 3 files changed, 92 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/570dbc8d/docs/monitoring/metrics.md ---------------------------------------------------------------------- diff --git a/docs/monitoring/metrics.md b/docs/monitoring/metrics.md index afbce90..578c926 100644 --- a/docs/monitoring/metrics.md +++ b/docs/monitoring/metrics.md @@ -83,6 +83,8 @@ A `Gauge` provides a value of any type on demand. In order to use a `Gauge` you There is no restriction for the type of the returned value. You can register a gauge by calling `gauge(String name, Gauge gauge)` on a `MetricGroup`. +<div class="codetabs" markdown="1"> +<div data-lang="java" markdown="1"> {% highlight java %} public class MyMapper extends RichMapFunction<String, Integer> { @@ -102,6 +104,26 @@ public class MyMapper extends RichMapFunction<String, Integer> { } {% endhighlight %} +</div> + +<div data-lang="scala" markdown="1"> +{% highlight scala %} + +public class MyMapper extends RichMapFunction[String,Int] { + val valueToExpose = 5 + + override def open(parameters: Configuration): Unit = { + getRuntimeContext() + .getMetricGroup() + .gauge("MyGauge", ScalaGauge[Int]( () => valueToExpose ) ) + } + ... +} + +{% endhighlight %} +</div> + +</div> Note that reporters will turn the exposed object into a `String`, which means that a meaningful `toString()` implementation is required. http://git-wip-us.apache.org/repos/asf/flink/blob/570dbc8d/flink-scala/src/main/scala/org/apache/flink/api/scala/metrics/ScalaGauge.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/main/scala/org/apache/flink/api/scala/metrics/ScalaGauge.scala b/flink-scala/src/main/scala/org/apache/flink/api/scala/metrics/ScalaGauge.scala new file mode 100644 index 0000000..e2f9ebf --- /dev/null +++ b/flink-scala/src/main/scala/org/apache/flink/api/scala/metrics/ScalaGauge.scala @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.metrics + +import org.apache.flink.metrics.Gauge + +/** + * This class allows the concise definition of a gauge from Scala using function references. + */ +class ScalaGauge[T](func: () => T) extends Gauge[T] { + override def getValue: T = { + func() + } +} + +object ScalaGauge { + def apply[T](func: () => T): ScalaGauge[T] = new ScalaGauge(func) +} http://git-wip-us.apache.org/repos/asf/flink/blob/570dbc8d/flink-scala/src/test/scala/org/apache/flink/api/scala/metrics/ScalaGaugeTest.scala ---------------------------------------------------------------------- diff --git a/flink-scala/src/test/scala/org/apache/flink/api/scala/metrics/ScalaGaugeTest.scala b/flink-scala/src/test/scala/org/apache/flink/api/scala/metrics/ScalaGaugeTest.scala new file mode 100644 index 0000000..9d53e4c --- /dev/null +++ b/flink-scala/src/test/scala/org/apache/flink/api/scala/metrics/ScalaGaugeTest.scala @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.api.scala.metrics + +import org.apache.flink.metrics.Gauge +import org.apache.flink.runtime.metrics.{MetricRegistry, MetricRegistryConfiguration} +import org.apache.flink.runtime.metrics.groups.GenericMetricGroup +import org.apache.flink.util.TestLogger +import org.junit.Test +import org.scalatest.junit.JUnitSuiteLike + +class ScalaGaugeTest extends TestLogger with JUnitSuiteLike { + + @Test + def testGaugeCorrectValue(): Unit = { + val myGauge = ScalaGauge[Long](() => 4) + assert(myGauge.getValue == 4) + } + +}