I updated from 10.1 and 10.2. I updated both the broker and maven
dependency.

I am using topic auto-create. With 10.1, starting the application with a
broker would sometimes result in an error like:

> Exception in thread "StreamThread-1"
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
building: stream-thread [StreamThread-1] Topic not found: $topic

But this would only happen once. Upon the second attempt, the topics are
already created and everything works fine.

But with 10.2 this error does not go away. I have confirmed and tested that
auto topic creation is enabled.

Here is the error/trace:


Exception in thread "StreamThread-1"
org.apache.kafka.streams.errors.TopologyBuilderException: Invalid topology
building: stream-thread [StreamThread-1] Topic not found: session-updates
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor$CopartitionedTopicsValidator.validate(StreamPartitionAssignor.java:734)
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.ensureCopartitioning(StreamPartitionAssignor.java:648)
at
org.apache.kafka.streams.processor.internals.StreamPartitionAssignor.assign(StreamPartitionAssignor.java:368)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.performAssignment(ConsumerCoordinator.java:339)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.onJoinLeader(AbstractCoordinator.java:488)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.access$1100(AbstractCoordinator.java:89)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:438)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$JoinGroupResponseHandler.handle(AbstractCoordinator.java:420)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:764)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:745)
at
org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:186)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:149)
at
org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:116)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:493)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:322)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:253)
at
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:172)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:334)
at
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303)
at
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1030)
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995)
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:582)
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:368)


It does not occur if my topology only defines streams and tables. However,
when I attempt to join a stream and a table, this error is thrown:

        // No error if this is in topology
        KTable<K, V> sessions = topology.table(byteStringSerde,
sessionSerde, "sessions", "sessions");

        // No error if this is in topology
        KStream<ByteString, Messages.EntityUpdate> sessionUpdates =
topology.stream(byteStringSerde, sessionUpdateSerde, "session-updates");

        // Error if this is in topology
        sessionUpdates
          .leftJoin(sessions, (update, value) -> {
              // do update, omitted
          })
          .filter((k, v) -> v != null)
          .to(byteStringSerde, sessionSerde, "sessions");

Reply via email to