Hi Mats and Jakob,

In the general case, I don't think that elements from upstream Flink tasks
always arrive at the same subtask of the sink. One problem is that user
computations can be non-deterministic. Moreover, a rebalance operation can
distribute the events of a task A among several downstream tasks B_1, B_2.
In this case, records are distributed in a round robin fashion. Therefore,
they depend on the arrival order at task A:

event_2, event_1 => A => event_1 => B_1
                                     => event_2 => B_2

event_1, event_2 => A => event_2 => B_1
                                     => event_1 => B_2

Since the order in which events from multiple producers arrive at the
consumer is not deterministic (e.g. due to network delays), you might see
different distributions.

However, I am not sure whether this is really a problem if every Pinot sink
makes sure that all segments which have been written after the last
checkpoint you have recovered from are deleted. You might just end up with
a different job result which is a member of the space of valid results.

One problem I see with eagerly writing segments to Pinot is that downstream
systems might already start consuming the results even though they might
still change because of a recovery. The way Flink solves this problem is to
only publish results once a checkpoint has been completed.

Cheers,
Till

On Mon, Jan 25, 2021 at 4:30 PM Poerschke, Mats <
mats.poersc...@student.hpi.uni-potsdam.de> wrote:

> Hi all,
>
> We want to give you a short update on the Pinot Sink since we started
> implementing a PoC.
> As described earlier, we aim to use batch-uploading of segments to Pinot
> in combination with caching elements in the Flink sink.
>
> Our current implementation works like this:
>
> Besides the pinot controller URI and the target table’s name, the sink
> allows users to define the max number of rows per segment.
>
> The PinotWriter is responsible for collecting elements, building segments
> and uploading them to Pinot. It therefore retrieves the Schema and
> TableConfig via the Pinot Controller API using the provided tableName.
> Whenever the specified maximum number of rows is reached, it starts the
> segment creation on disk. This process is handled by the Pinot admin-tool.
> A segmentID is structured as follows:
> <table-name>-<subtask-id>-<incremental-counter>
> Finally the PinotWriter pushes the created segment to the Pinot Controller
> which will then distribute it onto Pinot Servers.
>
> The PinotCommitter only checkpoints the segment ID of the segment that was
> last written. It is possible that multiple segments were uploaded to Pinot
> between two checkpoints.
>
> As for future plans, we want to prevent high memory pressure when
> collecting elements in the PinotWriter by directly writing elements to
> disk. The main question at this point is whether we can assume to have
> access to a disk temp directory.
>
> For the checkpointing and failure recovery we also thought of an approach
> without having tried it yet. Upon recovery from a checkpoint, the latest
> segment ID that is stored in the checkpoint can be accessed by the
> PinotSink. The PinotWriter then compares the incremental counter value of
> the checkpointed segment ID with segments that already exist in Pinot for
> the same table and subtask ID. If segments with a higher counter value in
> their IDs are discovered, they are deleted to avoid duplicates. After that,
> processing can continue as described above. We think that this mode of
> recovery assumes that elements from upstream Flink tasks always arrive at
> the same subtask of the sink. Is this fair?
>
> Best regards,
> Jakob and Mats
>
>
> On 6. Jan 2021, at 18:22, Yupeng Fu <yup...@uber.com.INVALID<mailto:
> yup...@uber.com.INVALID>> wrote:
>
> Hi Mats,
>
> Glad to see this interest! We at Uber are also working on a Pinot sink (for
> BATCH execution), with some help from the Pinot community on abstracting
> Pinot interfaces for segment writes and catalog retrieval. Perhaps we can
> collaborate on this proposal/POC.
>
> Cheers,
>
> Yupeng
>
>
>
> On Wed, Jan 6, 2021 at 6:12 AM Aljoscha Krettek <aljos...@apache.org
> <mailto:aljos...@apache.org>> wrote:
>
> That's good to hear. I wasn't sure because the explanation focused a lot
> on checkpoints and the details of it while with the new Sink interface
> implementers don't need to be concerned with those. And in fact, when
> the Sink is used in BATCH execution mode there will be no checkpoints.
>
> Other than that, the implementation sketch makes sense to me. I think to
> make further assessments you will probably have to work on a
> proof-of-concept.
>
> Best,
> Aljoscha
>
> On 2021/01/06 11:18, Poerschke, Mats wrote:
> Yes, we will use the latest sink interface.
>
> Best,
> Mats
>
> On 6. Jan 2021, at 11:05, Aljoscha Krettek <aljos...@apache.org<mailto:
> aljos...@apache.org>> wrote:
>
> It's great to see interest in this. Where you planning to use the new
> Sink interface that we recently introduced? [1]
>
> Best,
> Aljoscha
>
> [1] https://s.apache.org/FLIP-143
>
> On 2021/01/05 12:21, Poerschke, Mats wrote:
> Hi all,
>
> we want to contribute a sink connector for Apache Pinot. The following
> briefly describes the planned control flow. Please feel free to comment on
> any of its aspects.
>
> Background
> Apache Pinot is a large-scale real-time data ingestion engine working
> on data segments internally. The controller exposes an external API which
> allows posting new segments via REST call. A thereby posted segment must
> contain an id (called segment name).
>
> Control Flow
> The Flink sink will collect data tuples locally. When creating a
> checkpoint, all those tuples are grouped into one segment which is then
> pushed to the Pinot controller. We will assign each pushed segment a unique
> incrementing identifier.
> After receiving a success response from the Pinot controller, the
> latest segment name is persisted within the Flink checkpoint.
> In case we have to recover from a failure, the latest successfully
> pushed segment name can be reconstructed from the Flink checkpoint. At this
> point the system might be in an inconsistent state. The Pinot controller
> might have already stored a newer segment (which’s name was, due to the
> failure, not persisted in the flink checkpoint).
> This inconsistency is resolved with the next successful checkpoint
> creation. The there pushed segment will get the same segment name assigned
> as the inconsistent segment. Thus, Pinot replaces the old with the new
> segment which prevents introducing duplicate entries.
>
>
> Best regards
> Mats Pörschke
>
>
>
>
>

Reply via email to