Yeah, to be clear, I'm talking about having only one constructor for a
direct stream, that will give you a stream of ConsumerRecord.

Different needs for topic subscription, starting offsets, etc could be
handled by calling appropriate methods after construction but before
starting the stream.


On Wed, Mar 9, 2016 at 1:19 PM, Alan Braithwaite <a...@cloudflare.com> wrote:
> I'd probably prefer to keep it the way it is, unless it's becoming more like
> the function without the messageHandler argument.
>
> Right now I have code like this, but I wish it were more similar looking:
>
>     if (parsed.partitions.isEmpty()) {
>       JavaPairInputDStream<String, MessageWrapper> kvstream = KafkaUtils
>           .createDirectStream(jssc, String.class, MessageWrapper.class,
> StringDecoder.class,
>               MessageDecoder.class, kafkaArgs(parsed), topicSet);
>       requests = kvstream.map((Function<Tuple2<String, MessageWrapper>,
> MessageWrapper>) Tuple2::_2);
>     } else {
>       requests = KafkaUtils.createDirectStream(jssc, String.class,
>           MessageWrapper.class, StringDecoder.class, MessageDecoder.class,
> MessageWrapper.class,
>           kafkaArgs(parsed), parsed.partitions,
> (Function<MessageAndMetadata<String, MessageWrapper>,
>               MessageWrapper>) MessageAndMetadata::message);
>     }
>
> Of course, this is in the Java API so it may not have relevance to what
> you're talking about.
>
> Perhaps if both functions (the one with partitions arg and the one without)
> returned just ConsumerRecord, I would like that more.
>
> - Alan
>
> On Tue, Mar 8, 2016 at 6:49 AM, Cody Koeninger <c...@koeninger.org> wrote:
>>
>> No, looks like you'd have to catch them in the serializer and have the
>> serializer return option or something. The new consumer builds a buffer full
>> of records, not one at a time.
>>
>> On Mar 8, 2016 4:43 AM, "Marius Soutier" <mps....@gmail.com> wrote:
>>>
>>>
>>> > On 04.03.2016, at 22:39, Cody Koeninger <c...@koeninger.org> wrote:
>>> >
>>> > The only other valid use of messageHandler that I can think of is
>>> > catching serialization problems on a per-message basis.  But with the
>>> > new Kafka consumer library, that doesn't seem feasible anyway, and
>>> > could be handled with a custom (de)serializer.
>>>
>>> What do you mean, that doesn't seem feasible? You mean when using a
>>> custom deserializer? Right now I'm catching serialization problems in the
>>> message handler, after your proposed change I'd catch them in `map()`.
>>>
>

---------------------------------------------------------------------
To unsubscribe, e-mail: user-unsubscr...@spark.apache.org
For additional commands, e-mail: user-h...@spark.apache.org

Reply via email to