Hi all, We want to give you a short update on the Pinot Sink since we started implementing a PoC. As described earlier, we aim to use batch-uploading of segments to Pinot in combination with caching elements in the Flink sink.
Our current implementation works like this: Besides the pinot controller URI and the target table’s name, the sink allows users to define the max number of rows per segment. The PinotWriter is responsible for collecting elements, building segments and uploading them to Pinot. It therefore retrieves the Schema and TableConfig via the Pinot Controller API using the provided tableName. Whenever the specified maximum number of rows is reached, it starts the segment creation on disk. This process is handled by the Pinot admin-tool. A segmentID is structured as follows: <table-name>-<subtask-id>-<incremental-counter> Finally the PinotWriter pushes the created segment to the Pinot Controller which will then distribute it onto Pinot Servers. The PinotCommitter only checkpoints the segment ID of the segment that was last written. It is possible that multiple segments were uploaded to Pinot between two checkpoints. As for future plans, we want to prevent high memory pressure when collecting elements in the PinotWriter by directly writing elements to disk. The main question at this point is whether we can assume to have access to a disk temp directory. For the checkpointing and failure recovery we also thought of an approach without having tried it yet. Upon recovery from a checkpoint, the latest segment ID that is stored in the checkpoint can be accessed by the PinotSink. The PinotWriter then compares the incremental counter value of the checkpointed segment ID with segments that already exist in Pinot for the same table and subtask ID. If segments with a higher counter value in their IDs are discovered, they are deleted to avoid duplicates. After that, processing can continue as described above. We think that this mode of recovery assumes that elements from upstream Flink tasks always arrive at the same subtask of the sink. Is this fair? Best regards, Jakob and Mats On 6. Jan 2021, at 18:22, Yupeng Fu <yup...@uber.com.INVALID<mailto:yup...@uber.com.INVALID>> wrote: Hi Mats, Glad to see this interest! We at Uber are also working on a Pinot sink (for BATCH execution), with some help from the Pinot community on abstracting Pinot interfaces for segment writes and catalog retrieval. Perhaps we can collaborate on this proposal/POC. Cheers, Yupeng On Wed, Jan 6, 2021 at 6:12 AM Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> wrote: That's good to hear. I wasn't sure because the explanation focused a lot on checkpoints and the details of it while with the new Sink interface implementers don't need to be concerned with those. And in fact, when the Sink is used in BATCH execution mode there will be no checkpoints. Other than that, the implementation sketch makes sense to me. I think to make further assessments you will probably have to work on a proof-of-concept. Best, Aljoscha On 2021/01/06 11:18, Poerschke, Mats wrote: Yes, we will use the latest sink interface. Best, Mats On 6. Jan 2021, at 11:05, Aljoscha Krettek <aljos...@apache.org<mailto:aljos...@apache.org>> wrote: It's great to see interest in this. Where you planning to use the new Sink interface that we recently introduced? [1] Best, Aljoscha [1] https://s.apache.org/FLIP-143 On 2021/01/05 12:21, Poerschke, Mats wrote: Hi all, we want to contribute a sink connector for Apache Pinot. The following briefly describes the planned control flow. Please feel free to comment on any of its aspects. Background Apache Pinot is a large-scale real-time data ingestion engine working on data segments internally. The controller exposes an external API which allows posting new segments via REST call. A thereby posted segment must contain an id (called segment name). Control Flow The Flink sink will collect data tuples locally. When creating a checkpoint, all those tuples are grouped into one segment which is then pushed to the Pinot controller. We will assign each pushed segment a unique incrementing identifier. After receiving a success response from the Pinot controller, the latest segment name is persisted within the Flink checkpoint. In case we have to recover from a failure, the latest successfully pushed segment name can be reconstructed from the Flink checkpoint. At this point the system might be in an inconsistent state. The Pinot controller might have already stored a newer segment (which’s name was, due to the failure, not persisted in the flink checkpoint). This inconsistency is resolved with the next successful checkpoint creation. The there pushed segment will get the same segment name assigned as the inconsistent segment. Thus, Pinot replaces the old with the new segment which prevents introducing duplicate entries. Best regards Mats Pörschke