mjsax commented on code in PR #17973:
URL: https://github.com/apache/kafka/pull/17973#discussion_r1866979224
##########
streams/src/main/java/org/apache/kafka/streams/Topology.java:
##########
@@ -77,6 +77,166 @@ protected Topology(final InternalTopologyBuilder
internalTopologyBuilder) {
public enum AutoOffsetReset {
EARLIEST, LATEST
}
+ /**
+ * Adds a new source that consumes the specified topics and forwards the
records to child processor and/or sink nodes.
+ * The source will use the specified {@link
org.apache.kafka.streams.AutoOffsetReset offset reset policy} if no committed
offsets are found.
+ *
+ * @param offsetReset the auto offset reset policy to use for this source
if no committed offsets are found; acceptable values: earliest or latest
+ * @param name the unique name of the source used to reference this node
when {@link #addProcessor(String, ProcessorSupplier, String...) adding
processor children}
+ * @param topics the name of one or more Kafka topics that this source is
to consume
+ * @return itself
+ * @throws TopologyException if a processor is already added or if topics
have already been registered by another source
+ */
+ public synchronized Topology addSource(final
org.apache.kafka.streams.AutoOffsetReset offsetReset,
+ final String name,
+ final String... topics) {
+ internalTopologyBuilder.addSource(null, name, null, null, null,
topics);
Review Comment:
We need to pass `offsetRest` to `addSource(...)` -- similar below, and thus
also update `InternalTopologyBuilder` itself.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]