Hey there, Kafka Users,

I'm trying to join two topics with Kafka Streams.  The first topic is a
changelog of one object, and the second is a changelog of a related
object.  In order to join these tables, I'm grouping the second table by a
piece of data in it that indicates what record it is related to in the
first table.  But I'm getting an unexpected error related to the
repartitioning topic for the aggregated table:

org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
building: External source topic not found:
*TableNumber2Aggregated-repartition*
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:452)
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:440)


(Full exception:
https://gist.github.com/mfenniak/11ca081191932fbb33a0c3cc32ad1686)

It appears that the "TableNumber2Aggregated-repartition" topic *is* created
in Kafka by the streams application, but the Kafka topic has a prefix that
matches my application id (timesheet-status).  Perhaps something is
prefixing the topic name, but it isn't being applied everywhere?

$ ./kafka-topics.sh --zookeeper localhost --list
TableNumber1
TableNumber2
__consumer_offsets
timesheet-status-TableNumber2Aggregated-repartition


Here's a sample that reproduces the issue (note, I've cut out all the
actual mapping, grouping, and aggregating logic, but, this still reproduces
the error):

    public static TopologyBuilder createTopology() {
        KStreamBuilder builder = new KStreamBuilder();

        KTable table1Mapped = builder.table(Serdes.String(), new
JsonSerde(Map.class), "TableNumber1")
                .mapValues((value) -> null);

        KTable table2Aggregated = builder.table(Serdes.String(), new
JsonSerde(Map.class), "TableNumber2")
            .groupBy((key, value) -> null)
            .aggregate(() -> null, (k, v, t) -> null, (k, v, t) -> null,
new JsonSerde(Map.class), "TableNumber2Aggregated");

        table1Mapped.join(table2Aggregated, (left, right) -> {
            LOG.debug("join");
            return null;
        });

        return builder;
    }

I'm using the latest Kafka Streams release, 0.10.0.1.  Any thoughts on how
I could proceed to debug or workaround this?

Thanks all,

Mathieu

Reply via email to