rionmonster opened a new pull request, #27072: URL: https://github.com/apache/flink/pull/27072
## What is the purpose of the change Per the discussions within [FLINK-24493](https://issues.apache.org/jira/browse/FLINK-24493), support does not presently exist for dynamically routing incoming elements to specific sinks at runtime, based on some characteristic. This pull request addresses that by implementing a new `DemultiplexingSink` construct that will support such behavior. The sink implementation consists of two components: the `DemultiplexingSink` itself along with a corresponding `SinkRouter` interface, whos implementation will govern _how_ a given element will be mapped to a specific sink at runtime. These active routes (i.e. those that have been previously initialized) will be stored within the internal state of the sink to prevent unnecessary sink initialization for existing sinks, essentially functioning as a cache. ### Example Usage An example demonstrating this behavior might look like the following: ``` // Define SinkRouter (for routing element to specific sink) SinkRouter<YourElement, String> router = new SinkRouter<YourElement, String>() { @Override public String getRoute(YourElement element) { // Sink-key resolution (in this case get the name of the topic) return element.getTopicName(); } @Override public Sink<YourElement> createSink(String topicName, YourElement element) { return KafkaSink.<YourElement>builder() .setBootstrapServers(...) .setRecordSerializer(...) .setTopics(topicName) .build(); } }; // Define the sink DemultiplexingSink<YourElement, String> demuxSink = new DemultiplexingSink<>(router); // Example applying the sink streamEnv .process(YourBusinessLogic()) .sinkTo(demuxSink); ``` This example demonstrates consuming a series of `YourElement` elements and routes them to dynamic Kafka topics based on a specific attribute within the object itself through the following chain of events: - Defining the `SinkRouter` instance responsible for defining the logic to route the element to its destination topic (in this case a predefined `element.getTopicName()` implementation) - Defining a `DemultiplexingSink<YourElement, String>` instance that uses the previously defined `SinkRouter` instance. - When the element is sent to the sink, the logic within `SinkRouter.getRoute()` will be executed to determine the sink to route the element to. - If the route key does not exist, the sink will be initalized and stored within the internal cache. - If the key exists, the sink will be read from the cache directly. - The element will be sent to the resolved sink. **NOTE: The `SinkRouter` implementation is not limited to String-based keys, so the above example could easily support dynamic routing to different Kafka brokers, topics, etc. Implementations using other popular sinks such as JDBC, Elasticsearch, etc. may likely require additional fields to resolve depending on the level of dynamic behavior required (e.g. including credentials, etc.)** ## Brief change log - *Added `DemultiplexingSink` and related `SinkRouter` interface to support dynamic sink creation and routing at runtime.* - *Added supporting classes like `DemultiplexingSinkState`, `DemultiplexingSinkStateSerializer`, and `DemultiplexingSinkWriter` related to stateful sink operations, resilency, and recovery* - *Added `DemultiplexingSinkTest` to verify sink creation, `DemultiplexingSinkStateSerializerTest` for verification of state serialization/deserialzation behavior, and `DemultiplexingSinkWriterTest` to verify successful writing to single/multiple routes* ## Verifying this change TODO ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): **No** - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: **No** - The serializers: **No** - The runtime per-record code paths (performance sensitive): **No** - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: No - The S3 file system connector: **No** ## Documentation - Does this pull request introduce a new feature? **Yes** - If yes, how is the feature documented? **Javadocs** -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
