vvivekiyer commented on PR #9544:
URL: https://github.com/apache/pinot/pull/9544#issuecomment-1272076485
> If Linkedin is using `MessageBatch<IndexedRecord>`, does that mean
Linkedin is also using its own implementation of kafka consumer and not the OSS
plugin?
Yes. LinkedIn has a custom kafka consumer implementation.
> I am curious why Linkedin does this? isn't is expensive to deserialize
every record until when the deserialized payload is actually needed? The
consumer's contract should not involve deserializing the payload. Can you
please explain why this is useful?
LinkedIn's kafka consumer directly fetches the deserialized payload . AFAIK,
Linkedin Kafka has a schema registry where the payload's schema is registered.
So they provide (optimized) deser and do not allow clients to have their own
deserialization. @sajjad to add more details, if any.
> I also don't understand the point about "unnecessary type conversion"
here. Can you please elaborate?
I meant additional serialization and deserialization. Edited the description.
> Using generics forces the segment manager implementation to deal with raw
usage of parameterized classes (due to type erasures) and make the code hard to
read and maintain.
As per my understanding of the code, SegmentManager only deals with
GenericRow once the deserialization is done. Depending on various
implementations, `MessageBatch<T>` and `StreamMessageDecoder ` interfaces take
care of abstracting the messages and decoding. Each implementation can deal
with the formats they wish. So can you please clarify what you mean by making
this code harder to read and maintain?
> Besides, StreamMessage is meant to abstract the entire incoming payload.
Using only the type of the record's "value" in this generic class seems
prohibitive.
I agree with this part. We can discussion and arrive at the best way to do
this. But IMO, forcing `StreamMessage` to have key and value of type `bytes[]`
seems counter-intuitive based on our code flow. I've tried to explain the issue
below.
This is my understanding of our OSS code prior to #9224:
1. Get a batch of messages:
`MessageBatch<T> messageBatch = PartitionLevelConsumer.fetchMessage();`
2. `for (message in MessageBatch<T>)`, decode the message
`GenericRow row = StreamMessageDecoder<T>.decode(message)`
Note that MessageBatch is a generic interface because users of Pinot are
free to use their custom kafka (or other) client implementations that could
return messages in any format.
After #9224, the code looks as follows:
1. Get a batch of messages:
`MessageBatch<T> messageBatch = PartitionLevelConsumer.fetchMessage();`
2. Get each message and store it in a StreamMessage wrapper. Note that
StreamMessage stores values only as byte[].
`byte[] StreamMessage.value = messageBatch.getMessageValue(index)`
3. Decode StreamMessage
`GenericRow row =
StreamMessageDecoder<T>.decode(StreamMessage.value())`
Looking at the above, it looks like we've introduced a new step (2), where
we are forcing messages of generic type (MessageBatch) to be serialized to
`byte[]` and go back to working on generic types again in
`StreamMessageDecoder`.
As you mentioned, the new code assumes that when messages are read from the
stream consumer, they will always be in serialized format. But the existing
code for `MessageBatch<T>` and `StreamMessageDecoder<T>` doesn't honor the
assumption.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]