Hi all, I've recently been encountering some issues that I've noticed in the logs of my Flink job that handles writing to an Elasticsearch index. I was hoping to leverage some of the metrics that Flink exposes (or piggyback on them) to update metric counters when I encounter specific kinds of errors.
val builder = ElasticsearchSink.Builder(...) builder.setFailureHandler { actionRequest, throwable, _, _ -> // Log error here (and update metrics via metricGroup.counter(...) } return builder.build() Is there a way to handle this currently? My specific implementation has a process function that manages multiple sinks (so I can create these dynamically), but in the case of these errors, it doesn't look like I can access the metric group within the setFailureHandler at present. My initial thought was in my parent process function, I could pass in the context to the child sinks so that I'd have context for the exceptions/metrics: class DynamicElasticsearchSink<ElementT, RouteT, SinkT : ElasticsearchSinkBase<ElementT, out AutoCloseable>>( /** * Defines a router that maps an element to its corresponding ElasticsearchSink instance * @param sinkRouter A [ElasticSinkRouter] that takes an element of type [ElementT], a string-based route * defined as [RouteT] which is used for caching sinks, and finally the sink itself as [ElasticsearchSink] */ private val sinkRouter: ElasticsearchSinkRouter<ElementT, RouteT, SinkT> ) : RichSinkFunction<ElementT>(), CheckpointedFunction { // Store a reference to all of the current routes private val sinkRoutes: MutableMap<RouteT, SinkT> = ConcurrentHashMap() private lateinit var configuration: Configuration override fun open(parameters: Configuration) { configuration = parameters } override fun invoke(value: ElementT, context: SinkFunction.Context) { val route = sinkRouter.getRoute(value) var sink = sinkRoutes[route] if (sink == null) { // Here's where the sink is constructed when an exception occurs sink = sinkRouter.createSink(route, value) sink.runtimeContext = runtimeContext sink.open(configuration) sinkRoutes[route] = sink } sink.invoke(value, context) } } I'd imagine within the open call for this function, I could store the metrics group and pass it into my createSink() call so the child sinks would have a reference to it. Does that seem feasible or is there another way to handle this? Thanks all, Rion