rionmonster opened a new pull request, #27072:
URL: https://github.com/apache/flink/pull/27072

   ## What is the purpose of the change
   
   Per the discussions within 
[FLINK-24493](https://issues.apache.org/jira/browse/FLINK-24493), support does 
not presently exist for dynamically routing incoming elements to specific sinks 
at runtime, based on some characteristic. This pull request addresses that by 
implementing a new `DemultiplexingSink` construct that will support such 
behavior.
   
   The sink implementation consists of two components: the `DemultiplexingSink` 
itself along with a corresponding `SinkRouter` interface, whos implementation 
will govern _how_ a given element will be mapped to a specific sink at runtime. 
These active routes (i.e. those that have been previously initialized) will be 
stored within the internal state of the sink to prevent unnecessary sink 
initialization for existing sinks, essentially functioning as a cache.
   
   ### Example Usage
   
   An example demonstrating this behavior might look like the following:
   
   ```
   // Define SinkRouter (for routing element to specific sink)
   SinkRouter<YourElement, String> router = new SinkRouter<YourElement, 
String>() {
       @Override
       public String getRoute(YourElement element) {
           // Sink-key resolution (in this case get the name of the topic)
           return element.getTopicName();
       }
   
       @Override
       public Sink<YourElement> createSink(String topicName, YourElement 
element) {
           return KafkaSink.<YourElement>builder()
               .setBootstrapServers(...)
               .setRecordSerializer(...)
               .setTopics(topicName)
               .build();
       }
   };
   
   // Define the sink
   DemultiplexingSink<YourElement, String> demuxSink =
       new DemultiplexingSink<>(router);
   
   // Example applying the sink
   streamEnv
       .process(YourBusinessLogic())
       .sinkTo(demuxSink);
   ```
   
   This example demonstrates consuming a series of `YourElement` elements and 
routes them to dynamic Kafka topics based on a specific attribute within the 
object itself through the following chain of events:
   - Defining the `SinkRouter` instance responsible for defining the logic to 
route the element to its destination topic (in this case a predefined 
`element.getTopicName()` implementation)
   - Defining a `DemultiplexingSink<YourElement, String>` instance that uses 
the previously defined `SinkRouter` instance.
   - When the element is sent to the sink, the logic within 
`SinkRouter.getRoute()` will be executed to determine the sink to route the 
element to.
     - If the route key does not exist, the sink will be initalized and stored 
within the internal cache.
     - If the key exists, the sink will be read from the cache directly.
   - The element will be sent to the resolved sink.
   
   **NOTE: The `SinkRouter` implementation is not limited to String-based keys, 
so the above example could easily support dynamic routing to different Kafka 
brokers, topics, etc. Implementations using other popular sinks such as JDBC, 
Elasticsearch, etc. may likely require additional fields to resolve depending 
on the level of dynamic behavior required (e.g. including credentials, etc.)**
   
   ## Brief change log
   
   - *Added `DemultiplexingSink` and related `SinkRouter` interface to support 
dynamic sink creation and routing at runtime.*
   - *Added supporting classes like `DemultiplexingSinkState`, 
`DemultiplexingSinkStateSerializer`, and `DemultiplexingSinkWriter` related to 
stateful sink operations, resilency, and recovery*
   - *Added `DemultiplexingSinkTest` to verify sink creation, 
`DemultiplexingSinkStateSerializerTest` for verification of state 
serialization/deserialzation behavior, and `DemultiplexingSinkWriterTest` to 
verify successful writing to single/multiple routes*
   
   ## Verifying this change
   
   TODO
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **No**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **No**
     - The serializers: **No**
     - The runtime per-record code paths (performance sensitive): **No**
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn, ZooKeeper: No
     - The S3 file system connector: **No**
     
   ## Documentation
   
     - Does this pull request introduce a new feature? **Yes**
     - If yes, how is the feature documented? **Javadocs**
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to