Hi all,

I've recently been encountering some issues that I've noticed in the logs
of my Flink job that handles writing to an Elasticsearch index. I was
hoping to leverage some of the metrics that Flink exposes (or piggyback on
them) to update metric counters when I encounter specific kinds of errors.

val builder = ElasticsearchSink.Builder(...)

builder.setFailureHandler { actionRequest, throwable, _, _ ->
    // Log error here (and update metrics via metricGroup.counter(...)
}

return builder.build()

Is there a way to handle this currently? My specific implementation has a
process function that manages multiple sinks (so I can create these
dynamically), but in the case of these errors, it doesn't look like I can
access the metric group within the setFailureHandler at present.

My initial thought was in my parent process function, I could pass in the
context to the child sinks so that I'd have context for the
exceptions/metrics:

class DynamicElasticsearchSink<ElementT, RouteT, SinkT :
ElasticsearchSinkBase<ElementT, out AutoCloseable>>(
    /**
     * Defines a router that maps an element to its corresponding
ElasticsearchSink instance
     * @param sinkRouter A [ElasticSinkRouter] that takes an element
of type [ElementT], a string-based route
     * defined as [RouteT] which is used for caching sinks, and
finally the sink itself as [ElasticsearchSink]
     */
    private val sinkRouter: ElasticsearchSinkRouter<ElementT, RouteT, SinkT>
) : RichSinkFunction<ElementT>(), CheckpointedFunction {

    // Store a reference to all of the current routes
    private val sinkRoutes: MutableMap<RouteT, SinkT> = ConcurrentHashMap()
    private lateinit var configuration: Configuration

    override fun open(parameters: Configuration) {
        configuration = parameters
    }

    override fun invoke(value: ElementT, context: SinkFunction.Context) {
        val route = sinkRouter.getRoute(value)
        var sink = sinkRoutes[route]
        if (sink == null) {
            // Here's where the sink is constructed when an exception occurs
            sink = sinkRouter.createSink(route, value)
            sink.runtimeContext = runtimeContext
            sink.open(configuration)
            sinkRoutes[route] = sink
        }

        sink.invoke(value, context)
    }
}

I'd imagine within the open call for this function, I could store the
metrics group and pass it into my createSink() call so the child sinks
would have a reference to it. Does that seem feasible or is there another
way to handle this?

Thanks all,

Rion

Reply via email to