Hi Colin,

Are you initializing your counters from within the open() method of you rich 
function?
In other words, are you calling 

counter = getRuntimeContext.getMetricGroup.counter(“my counter”) 

from within the open().

The counter interface is not serializable. So if you instantiate the counters 
outside the open(),
when Flink tries to ship your code to the cluster, it cannot so you get the 
exception.

You can have a look at the docs for an example:
https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html
 
<https://ci.apache.org/projects/flink/flink-docs-release-1.3/monitoring/metrics.html>

Thanks,
Kostas

> On Oct 7, 2017, at 11:34 PM, Colin Williams 
> <colin.williams.seat...@gmail.com> wrote:
> 
> I've created a RichMapFunction in scala with multiple counters like:
> 
>    lazy val successCounter = 
> getRuntimeContext.getMetricGroup.counter("successfulParse")
>    lazy val failedCounter = 
> getRuntimeContext.getMetricGroup.counter("failedParse")
>    lazy val errorCounter = 
> getRuntimeContext.getMetricGroup.counter("errorParse")
> 
> which I increment in the map function. While testing I noticed that I have no 
> issues with using a single counter. However with multiple counters I get a 
> serialization error using more than one counter.
> 
> Does anyone know how I can use multiple counters from my RichMapFunction, or 
> what I'm doing wrong?
> 
> [info]   org.apache.flink.api.common.InvalidProgramException: The 
> implementation of the RichMapFunction is not serializable. The object 
> probably contains or references non serializable fields.
> [info]   at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
> [info]   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at 
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
> [info]   at 
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
> [info]   at 
> org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
> [info]   at 
> ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:27)
> [info]   at 
> ParsedResultUnwrapperTest$$anonfun$2.apply(ParsedResultUnwrapperTest.scala:23)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   ...
> [info]   Cause: java.io <http://java.io/>.NotSerializableException: 
> org.apache.flink.metrics.SimpleCounter
> [info]   at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> [info]   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> [info]   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> [info]   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> [info]   at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> [info]   at 
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> [info]   at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
> [info]   at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
> [info]   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at 
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
> [info]   ...
> [info] - ParseResultUnwrapper.errorCounter.getCount should return 1L for a 
> Error -> ParseResult[LineProtocol] *** FAILED ***
> [info]   org.apache.flink.api.common.InvalidProgramException: The 
> implementation of the RichMapFunction is not serializable. The object 
> probably contains or references non serializable fields.
> [info]   at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:100)
> [info]   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at 
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
> [info]   at 
> org.apache.flink.streaming.api.datastream.DataStream.map(DataStream.java:527)
> [info]   at 
> org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:581)
> [info]   at 
> ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:37)
> [info]   at 
> ParsedResultUnwrapperTest$$anonfun$3.apply(ParsedResultUnwrapperTest.scala:32)
> [info]   at org.scalatest.OutcomeOf$class.outcomeOf(OutcomeOf.scala:85)
> [info]   at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
> [info]   at org.scalatest.Transformer.apply(Transformer.scala:22)
> [info]   ...
> [info]   Cause: java.io <http://java.io/>.NotSerializableException: 
> org.apache.flink.metrics.SimpleCounter
> [info]   at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
> [info]   at 
> java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
> [info]   at 
> java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
> [info]   at 
> java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
> [info]   at 
> java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
> [info]   at 
> java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
> [info]   at 
> org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:315)
> [info]   at 
> org.apache.flink.api.java.ClosureCleaner.clean(ClosureCleaner.java:81)
> [info]   at 
> org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.clean(StreamExecutionEnvironment.java:1548)
> [info]   at 
> org.apache.flink.streaming.api.datastream.DataStream.clean(DataStream.java:183)
> [info]   ...

Reply via email to