Hi Mats and Jakob, In the general case, I don't think that elements from upstream Flink tasks always arrive at the same subtask of the sink. One problem is that user computations can be non-deterministic. Moreover, a rebalance operation can distribute the events of a task A among several downstream tasks B_1, B_2. In this case, records are distributed in a round robin fashion. Therefore, they depend on the arrival order at task A:
event_2, event_1 => A => event_1 => B_1 => event_2 => B_2 event_1, event_2 => A => event_2 => B_1 => event_1 => B_2 Since the order in which events from multiple producers arrive at the consumer is not deterministic (e.g. due to network delays), you might see different distributions. However, I am not sure whether this is really a problem if every Pinot sink makes sure that all segments which have been written after the last checkpoint you have recovered from are deleted. You might just end up with a different job result which is a member of the space of valid results. One problem I see with eagerly writing segments to Pinot is that downstream systems might already start consuming the results even though they might still change because of a recovery. The way Flink solves this problem is to only publish results once a checkpoint has been completed. Cheers, Till On Mon, Jan 25, 2021 at 4:30 PM Poerschke, Mats < mats.poersc...@student.hpi.uni-potsdam.de> wrote: > Hi all, > > We want to give you a short update on the Pinot Sink since we started > implementing a PoC. > As described earlier, we aim to use batch-uploading of segments to Pinot > in combination with caching elements in the Flink sink. > > Our current implementation works like this: > > Besides the pinot controller URI and the target table’s name, the sink > allows users to define the max number of rows per segment. > > The PinotWriter is responsible for collecting elements, building segments > and uploading them to Pinot. It therefore retrieves the Schema and > TableConfig via the Pinot Controller API using the provided tableName. > Whenever the specified maximum number of rows is reached, it starts the > segment creation on disk. This process is handled by the Pinot admin-tool. > A segmentID is structured as follows: > <table-name>-<subtask-id>-<incremental-counter> > Finally the PinotWriter pushes the created segment to the Pinot Controller > which will then distribute it onto Pinot Servers. > > The PinotCommitter only checkpoints the segment ID of the segment that was > last written. It is possible that multiple segments were uploaded to Pinot > between two checkpoints. > > As for future plans, we want to prevent high memory pressure when > collecting elements in the PinotWriter by directly writing elements to > disk. The main question at this point is whether we can assume to have > access to a disk temp directory. > > For the checkpointing and failure recovery we also thought of an approach > without having tried it yet. Upon recovery from a checkpoint, the latest > segment ID that is stored in the checkpoint can be accessed by the > PinotSink. The PinotWriter then compares the incremental counter value of > the checkpointed segment ID with segments that already exist in Pinot for > the same table and subtask ID. If segments with a higher counter value in > their IDs are discovered, they are deleted to avoid duplicates. After that, > processing can continue as described above. We think that this mode of > recovery assumes that elements from upstream Flink tasks always arrive at > the same subtask of the sink. Is this fair? > > Best regards, > Jakob and Mats > > > On 6. Jan 2021, at 18:22, Yupeng Fu <yup...@uber.com.INVALID<mailto: > yup...@uber.com.INVALID>> wrote: > > Hi Mats, > > Glad to see this interest! We at Uber are also working on a Pinot sink (for > BATCH execution), with some help from the Pinot community on abstracting > Pinot interfaces for segment writes and catalog retrieval. Perhaps we can > collaborate on this proposal/POC. > > Cheers, > > Yupeng > > > > On Wed, Jan 6, 2021 at 6:12 AM Aljoscha Krettek <aljos...@apache.org > <mailto:aljos...@apache.org>> wrote: > > That's good to hear. I wasn't sure because the explanation focused a lot > on checkpoints and the details of it while with the new Sink interface > implementers don't need to be concerned with those. And in fact, when > the Sink is used in BATCH execution mode there will be no checkpoints. > > Other than that, the implementation sketch makes sense to me. I think to > make further assessments you will probably have to work on a > proof-of-concept. > > Best, > Aljoscha > > On 2021/01/06 11:18, Poerschke, Mats wrote: > Yes, we will use the latest sink interface. > > Best, > Mats > > On 6. Jan 2021, at 11:05, Aljoscha Krettek <aljos...@apache.org<mailto: > aljos...@apache.org>> wrote: > > It's great to see interest in this. Where you planning to use the new > Sink interface that we recently introduced? [1] > > Best, > Aljoscha > > [1] https://s.apache.org/FLIP-143 > > On 2021/01/05 12:21, Poerschke, Mats wrote: > Hi all, > > we want to contribute a sink connector for Apache Pinot. The following > briefly describes the planned control flow. Please feel free to comment on > any of its aspects. > > Background > Apache Pinot is a large-scale real-time data ingestion engine working > on data segments internally. The controller exposes an external API which > allows posting new segments via REST call. A thereby posted segment must > contain an id (called segment name). > > Control Flow > The Flink sink will collect data tuples locally. When creating a > checkpoint, all those tuples are grouped into one segment which is then > pushed to the Pinot controller. We will assign each pushed segment a unique > incrementing identifier. > After receiving a success response from the Pinot controller, the > latest segment name is persisted within the Flink checkpoint. > In case we have to recover from a failure, the latest successfully > pushed segment name can be reconstructed from the Flink checkpoint. At this > point the system might be in an inconsistent state. The Pinot controller > might have already stored a newer segment (which’s name was, due to the > failure, not persisted in the flink checkpoint). > This inconsistency is resolved with the next successful checkpoint > creation. The there pushed segment will get the same segment name assigned > as the inconsistent segment. Thus, Pinot replaces the old with the new > segment which prevents introducing duplicate entries. > > > Best regards > Mats Pörschke > > > > >