Hey there, Kafka Users, I'm trying to join two topics with Kafka Streams. The first topic is a changelog of one object, and the second is a changelog of a related object. In order to join these tables, I'm grouping the second table by a piece of data in it that indicates what record it is related to in the first table. But I'm getting an unexpected error related to the repartitioning topic for the aggregated table:
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology building: External source topic not found: *TableNumber2Aggregated-repartition* at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:452) at org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:440) (Full exception: https://gist.github.com/mfenniak/11ca081191932fbb33a0c3cc32ad1686) It appears that the "TableNumber2Aggregated-repartition" topic *is* created in Kafka by the streams application, but the Kafka topic has a prefix that matches my application id (timesheet-status). Perhaps something is prefixing the topic name, but it isn't being applied everywhere? $ ./kafka-topics.sh --zookeeper localhost --list TableNumber1 TableNumber2 __consumer_offsets timesheet-status-TableNumber2Aggregated-repartition Here's a sample that reproduces the issue (note, I've cut out all the actual mapping, grouping, and aggregating logic, but, this still reproduces the error): public static TopologyBuilder createTopology() { KStreamBuilder builder = new KStreamBuilder(); KTable table1Mapped = builder.table(Serdes.String(), new JsonSerde(Map.class), "TableNumber1") .mapValues((value) -> null); KTable table2Aggregated = builder.table(Serdes.String(), new JsonSerde(Map.class), "TableNumber2") .groupBy((key, value) -> null) .aggregate(() -> null, (k, v, t) -> null, (k, v, t) -> null, new JsonSerde(Map.class), "TableNumber2Aggregated"); table1Mapped.join(table2Aggregated, (left, right) -> { LOG.debug("join"); return null; }); return builder; } I'm using the latest Kafka Streams release, 0.10.0.1. Any thoughts on how I could proceed to debug or workaround this? Thanks all, Mathieu