Hi,

This KIP is relating to add classes to kafka-stream-test-utils.
My initial plan was to create new project for these helper classes before I
decided to contribute directly to Apache Kafka with this KIP.
I have one streams project where I have used these classes, that is why I
planned to release this as separate "early access" package
before the Kafka release including this is publicly available. This would
work at the same time "a historically-compatible version of the library".

I try to summarize the open topics after your last email:

1. Add JavaDoc for KIP

Is there a good example of KIP where Javadoc is included, so I can follow?
I create this KIP based on this as an example::
https://cwiki.apache.org/confluence/display/KAFKA/KIP-247%3A+Add+public+test+utils+for+Kafka+Streams


Now added some comments to KIP page to clarify timestamp handling, but I
did not want to add full JavaDoc of each methods.
Is that enough?

2. TTD usability changes and swapping ConsumerRecord and ProducerRecord in
APIs

To my point of view only:
- changing readRecord to return ConsumerRecord would cause we cannot use
OutputVerifier
- changing pipeInput to take in ProducerRecord, but not providing easy way
to contruct those like ConsumerRecordFactory
- if initializing ConsumerRecord to/from  ProducerRecord  in these classes
field by field contructor, there are risk new fields are not added to this
classes if there are changes in ProducerRecord or ConsumerRecord

I would propose a separate KIP for these and probably other enhanchements:
-superclass or common interface for ConsumerRecord and ProducerRecord
-contructors to ConsumerRecord and ProducerRecord to initialize with this
superclass
-modify OutputVerifier to work with both ConsumerRecord and ProducerRecord
-create new RecordFactory to replace ConsumerRecordFactory


3. return null vs NoSuchElementException when empty queue

Should this be also included to the above TTD usability changes?
If single row read methods is changed to throw expectiong, it would require
addition of hasRecords to able to verified the empty queue scenarios.
I do not know how to implement it currently without modifying TTD to
provide some kind way to get the queue size or peak items.

4. IllegalArgumentException("topic doesn't exist")
Is this worth separate ticket?

5. org.apache.kafka.streams.test vs org.apache.kafka.streams

I was thinking org.apache.kafka.streams.test where also OutputVerifier and
ConsumerRecordFactory exist would be more logical place, but
I do not know is there some technical reasons why TTD are in
org.apache.kafka.streams, not in org.apache.kafka.streams.test where other
classes are.

Did I skip something?

Jukka


ti 7. toukok. 2019 klo 22.02 John Roesler (j...@confluent.io) kirjoitti:

> Thanks for the responses, Jukka!
>
> Thanks for the reference to the javadocs in your library, but I
> actually meant they should be part of the KIP document.
>
> As a general comment, I did get that your intent is to smooth over
> some rough spots with the current testing library, and that many of
> your API/behavior decisions are just reflecting the underlying TTD.
> What I'm wondering is whether we can take the opportunity to make a
> bigger change for an even smoother usability experience.
>
> Regarding the publishing artifact, did you mean that you're proposing
> to add a separate build artifact to the Apache Kafka project, or just
> that you plan to host a historically-compatible version of the library
> on your own? My initial reaction is that we should just include this
> stuff in the test-utils package starting at whatever version. This
> gives us greater latitude to modify the underlying layers in service
> of this KIP. Plus, it's nicer as a user to just have one test-support
> artifact to manage.
>
> Specific responses below:
> > >InputTopic:
> > >
> > >1. Have you considered adding a ProducerRecord input method to the input
> > >topic? This might be unnecessary, given the overloads you provide. I'm
> > >wondering if it decreases the domain mismatch between TopologyTestDriver
> > >and KafkaStreams, though, since in production code, you send input to
> the
> > >app as a ProducerRecord, via a topic. Also, this might let you drop
> some of
> > >the less mainstream overloads, like the ones for headers.
> >
> > Ok, so not same methods as in TopologyTestDriver:
> > pipeInput(ConsumerRecord<byte[], byte[]> consumerRecord);
> > pipeInput(List<ConsumerRecord<byte[], byte[]>> records);
> > but instead:
> > pipeInput(ProducerRecord <K, V]> record);
> > pipeInput(List< ProducerRecord  <K, V]>> records);
> > Which would convert ProductRecords to ConsumerRecords before piping to
> TTD
> >
> > and Drop these methods
> >     void pipeInput(K key, V value, Headers headers);
> >     void pipeInput(K key, V value, Headers headers, long timestampMs);
>
> Yes, this is essentially what I was saying.
>
> >
> > In this case you would not able to create those with
> ConsumerRecordFactory,
> > but needing to create  ProducerRecord   objects directly.one by one or
> with
> > some helper methods.
> >
> > Generally I was hoping that ProducerRecord and ConsumerRecord would have
> > inherited to same base class or implemented some kind KafkaRecord
> > interface, but
> > it is not the case now. So in the most of  case it would have enough to
> > pass KafkaRecord instead of ProducerRecord like in OutputVerifier now.
> >
> > What do you think?
> >
> I agree it would be nice in some cases to have a symmetric superclass
> to capture the common fields in Producer and Consumer records, but I
> don't know if I see how that's an impediment here.
>
> Part of what I was thinking below is that the interface you're
> proposing may actually render the ConsumerRecordFactory unnecessary as
> a public interface. Since we're taking a typed ProducerRecord<K,V>
>
> > >2. On the "pipeList" methods, it seems like the "start,advance"
> timestamp
> > >approach forces me to do a little mental math, if I actually want to
> > >achieve some specific timestamp per record, or to be able to verify the
> > >result, given specific timestamps as input. Did you consider a
> > >KeyValueTimestamp value type instead? Alternatively, if you like the
> > >ProducerRecord approach, above, you could lean on that instead.
> >
> > This start + advance concept is coming directly from
> ConsumerRecordFactory.
> > I don't see value of adding KeyValueTimestamp class.
> >
> > My own approach has been piping in the seeds with list and pipe special
> > times with:
> > void pipeInput(K key, V value, long timestampMs);
> >
> > This start + n*interval approach will fit the normal happy case scenarios
> > and
> > for timing relating test special cases I have used pipe single record at
> a
> > time.
> >
> > And that ProducerRecord approach is one alternative, but you end up
> > generating that list some special methods anyway,
> > which could also pipe those in one by one.
> >
>
> Ok, my intuition may not be the best here, because I do a *lot* of
> timestamp-sensitive testing. I think your experience is more typical.
> Plus, it sounds like you're no opposed to adding ProducerRecord<K,V>
> input, which renders KeyValueTimestamp totally unnecessary.
>
> I do understand that this "start,advance" API comes from
> ConsumerRecordFactory, I'm sorry if it seemed I was being critical of
> your proposal. I think that if I actually want to verify the right
> thing happens with timestamps, then the ProducerRecord input would let
> me do that, and for other cases, where I just need "some" timestamp,
> then start+advance works fine.
>
> > >
> > >3. I wasn't clear on the semantics of the constructors that take a start
> > >timestamp, but no advance time. I also wasn't clear on the semantics
> when
> > >the constructor specifies start/advance, but then we also call the input
> > >methods that specify timestamps, or start/advance timestamps. Also
> related,
> > >what's the "default" timestamp, if no start is specified, "zero" or
> "now"
> > >both seem reasonable. Similar with the advance, "1ms" and "0ms" both
> seem
> > >reasonable defaults.
> >
> > This is also based on the implementation of ConsumerRecordFactory, which
> > seems to
> > set the base time with System.currentTimeMillis() if not set,advanceMs
> is 0
> > and
> > you can overwrite these timestamp providing it with the method call.
> >
>
> I acknowledge that this is coming through from ConsumerRecordFactory,
> but I'm still wondering if it's the API we should be offering. This
> could be a good idea to correct an ambiguity that we didn't notice
> before.
>
> Also, note that even if the semantics are governed by the underlying
> library, they still need to be clear to someone looking just at this
> interface. Perhaps adding Javadocs to this KIP would help clear this
> up.
>
> > >
> > >OutputTopic:
> > >
> > >4. Tentatively, ProducerRecord seems like a strange output type, since
> as a
> > >user, I'm "consuming" the results. How about using ConsumerRecord
> instead?
> >
> > This is also directly from TTD. Also for this that KafkaRecord concept
> > would be better, but would
> > require alignment with existing classes, too.
>
> Despite the underlying library, do you think it would work to offer
> ConsumerRecord as an output type?
>
> > >
> > >5. We have methods above for producing with specific timestamps, but
> none
> > >for observing the timestamps. How can we strengthen the symmetry?
> >
> > My own experience with testing with TTD.
> > Those timestamp are important when feeding in the data, so can test for
> > example
> > timewindows and joins with left side or right side arriving first.
> >
> > For the result verification, I have not so far checked the Kafka
> timestamps.
> > Timewindow related information is available in the window key and the
> joins
> > the output need to be verified, not actual timestamp of it.
> > So for those case where you need it, those could be checked with
> > fetching ProducerRecord
> > one by one.
> >
> > What do you think?
>
> Ack. As I mentioned before, my experience is a little different than
> normal, so I think I trust your intuition more than my own.
>
> > >
> > >6. (just a comment) I like the readToMap method. Thanks!
> > >
> > Good.
> >
> > >7. I know it clashes somewhat with the Kafka semantics, but I'm a little
> > >concerned about null semantics in the output topic. In particular,
> > >"readValue()" returning null ambiguously indicates both "there is no
> value"
> > >and "there is a null value present". Can we consider throwing a
> > >NoSuchElementException when you attempt to read, but there is nothing
> > there?
> >
> > Yes, that readValue with null, is little confusing.
> > If you need to check value equal null, you need to fetch KeyValue with
> > readKeyValue instead.
> > Does it solve the same problem?
>
> I'm not sure. It feels like we're setting a trap for people. My spidey
> sense is still saying that a NoSuchElementException on all the
> read-single-value APIs is the best solution.
>
> >
> > Also what I noticed from the current implementation of TTD, that if you
> > pipe in to non-existing topic,
> > it generated Exception, but reading from non-existing topic return only
> > null.
> >
> > So getting null from readValue can mean you have mistyped the topic name,
> > there are no record to consumer or value of it is null.
>
> That is a bummer. I guess we could even add an
> IllegalArgumentException("topic doesn't exist") or something for this
> case.
>
> >
> > >
> > >7.5. This would necessitate adding a boolean method to query the
> presence
> > >of output records, which can be a handy way to cap off a test:
> > >`assertFalse(outputTopic.hasRecords(),
> > >outputTopic.readKeyValuesToList().toString())` would fail and print out
> the
> > >remaining records if there are any.
> >
> > Yes, I have thinking about the same, but so far I have used something
> like
> >
> > assertThat(outputTopic.readRecord(), nullValue());
> >
> > which will output the next record if it is failing.
>
> Yes, that does about the same thing, if we preserve the null-return
> behavior.
>
> >
> > >
> > >General:
> > >
> > >8. Can the TTD, input, and output topics implement AutoCloseable, to
> > >facilitate try-with-resources in tests?
> > >
> > >Example:
> > >try (driver = new TTD(), input = new TestInputTopic(), output = new
> > >TestOutputTopic() ) {
> > > ...
> > >} // ensures everything is closed
> > >
> > In the current implementation Topics does not have any state and do not
> > need the closing.
>
> That's fair.
>
> >
> > >7. Should we add some assertion that all the output is consumed when you
> > >call testDriver.close() ?
> >
> > Not by default at least. There are scenarios where you validate only
> > OutputTopic even the stream is generating others.
> > Maybe some kind of optionally called boolean method to check all topics
> in
> > TTD might be good.
>
> Yeah, you're right. In retrospect, this idea doesn't seem that useful.
>
> > >
> > >8. Should we consider deprecating (some or all) of the overlapping
> > >mechanisms in TopologyTestDriver? It seems like this might improve
> > >usability by reducing ambiguity.
> > The most of the methods do not overlap, only those
> Consumer/ProducerRecord
> > based ones, but
> > the problem with the current implementation is you can deprecate those
> > directly, because they are used by these classes.
> > So there would be need add some other non public methods to be used by
> > Topic classes in this case.
> > Also relating to it, these are now in org.apache.kafka.streams.test
> package
> > not in same package as TTD in org.apache.kafka.streams.
>
> Sorry, my wording was ambiguous, I meant deprecate
> ConsumerRecordFactory, along with any mechanism to produce or consumer
> data directly in/out of TopologyTestDriver. This would direct everyone
> to re-write their tests using this new topic-in/topic-out interface.
> Even if this is more work in the short term, people will generally
> benefit in the long run from not having to make a decision about which
> i/o API to use for every single test.
>
> The package is a good point. Why shouldn't we just put the new stuff
> in the same package as TTD?
>
> >
> > >
> > >9. Can you give us some idea of the javadoc for each of the new methods
> > >you're proposing? The documentation is also part of the public API, and
> it
> > >also helps us understand the semantics of the operations you're
> > proposing.
> > >
> >
> > My current version of JavaDoc can be found, here:
> > https://jukkakarvanen.github.io/kafka-streams-test-topics/
> >
> > Also I made proposal for Developer Guide Testing changes to
> > docs/streams/developer-guide/testing.html
> > <
> https://github.com/apache/kafka/compare/trunk...jukkakarvanen:KAFKA-8233_InputOutputTopics#diff-3e6e61eaf18f0d35cdb94ed705581b55
> >
> > :
> >
> https://github.com/apache/kafka/compare/trunk...jukkakarvanen:KAFKA-8233_InputOutputTopics
> >
> > (Not sure is there better way to check those, than in Github diff.)
> >
> > So, thanks for the feedback and I am happy to change those things you see
> > viable after my comments.
>
> Thanks! There's no need to include documentation changes in the KIP,
> but please include the proposed javadoc in the KIP design doc itself.
>

Reply via email to