Hi all,

We want to give you a short update on the Pinot Sink since we started 
implementing a PoC.
As described earlier, we aim to use batch-uploading of segments to Pinot in 
combination with caching elements in the Flink sink.

Our current implementation works like this:

Besides the pinot controller URI and the target table’s name, the sink allows 
users to define the max number of rows per segment.

The PinotWriter is responsible for collecting elements, building segments and 
uploading them to Pinot. It therefore retrieves the Schema and TableConfig via 
the Pinot Controller API using the provided tableName. Whenever the specified 
maximum number of rows is reached, it starts the segment creation on disk. This 
process is handled by the Pinot admin-tool. A segmentID is structured as 
follows: <table-name>-<subtask-id>-<incremental-counter>
Finally the PinotWriter pushes the created segment to the Pinot Controller 
which will then distribute it onto Pinot Servers.

The PinotCommitter only checkpoints the segment ID of the segment that was last 
written. It is possible that multiple segments were uploaded to Pinot between 
two checkpoints.

As for future plans, we want to prevent high memory pressure when collecting 
elements in the PinotWriter by directly writing elements to disk. The main 
question at this point is whether we can assume to have access to a disk temp 
directory.

For the checkpointing and failure recovery we also thought of an approach 
without having tried it yet. Upon recovery from a checkpoint, the latest 
segment ID that is stored in the checkpoint can be accessed by the PinotSink. 
The PinotWriter then compares the incremental counter value of the checkpointed 
segment ID with segments that already exist in Pinot for the same table and 
subtask ID. If segments with a higher counter value in their IDs are 
discovered, they are deleted to avoid duplicates. After that, processing can 
continue as described above. We think that this mode of recovery assumes that 
elements from upstream Flink tasks always arrive at the same subtask of the 
sink. Is this fair?

Best regards,
Jakob and Mats


On 6. Jan 2021, at 18:22, Yupeng Fu 
<yup...@uber.com.INVALID<mailto:yup...@uber.com.INVALID>> wrote:

Hi Mats,

Glad to see this interest! We at Uber are also working on a Pinot sink (for
BATCH execution), with some help from the Pinot community on abstracting
Pinot interfaces for segment writes and catalog retrieval. Perhaps we can
collaborate on this proposal/POC.

Cheers,

Yupeng



On Wed, Jan 6, 2021 at 6:12 AM Aljoscha Krettek 
<aljos...@apache.org<mailto:aljos...@apache.org>> wrote:

That's good to hear. I wasn't sure because the explanation focused a lot
on checkpoints and the details of it while with the new Sink interface
implementers don't need to be concerned with those. And in fact, when
the Sink is used in BATCH execution mode there will be no checkpoints.

Other than that, the implementation sketch makes sense to me. I think to
make further assessments you will probably have to work on a
proof-of-concept.

Best,
Aljoscha

On 2021/01/06 11:18, Poerschke, Mats wrote:
Yes, we will use the latest sink interface.

Best,
Mats

On 6. Jan 2021, at 11:05, Aljoscha Krettek 
<aljos...@apache.org<mailto:aljos...@apache.org>> wrote:

It's great to see interest in this. Where you planning to use the new
Sink interface that we recently introduced? [1]

Best,
Aljoscha

[1] https://s.apache.org/FLIP-143

On 2021/01/05 12:21, Poerschke, Mats wrote:
Hi all,

we want to contribute a sink connector for Apache Pinot. The following
briefly describes the planned control flow. Please feel free to comment on
any of its aspects.

Background
Apache Pinot is a large-scale real-time data ingestion engine working
on data segments internally. The controller exposes an external API which
allows posting new segments via REST call. A thereby posted segment must
contain an id (called segment name).

Control Flow
The Flink sink will collect data tuples locally. When creating a
checkpoint, all those tuples are grouped into one segment which is then
pushed to the Pinot controller. We will assign each pushed segment a unique
incrementing identifier.
After receiving a success response from the Pinot controller, the
latest segment name is persisted within the Flink checkpoint.
In case we have to recover from a failure, the latest successfully
pushed segment name can be reconstructed from the Flink checkpoint. At this
point the system might be in an inconsistent state. The Pinot controller
might have already stored a newer segment (which’s name was, due to the
failure, not persisted in the flink checkpoint).
This inconsistency is resolved with the next successful checkpoint
creation. The there pushed segment will get the same segment name assigned
as the inconsistent segment. Thus, Pinot replaces the old with the new
segment which prevents introducing duplicate entries.


Best regards
Mats Pörschke




Reply via email to