[ 
https://issues.apache.org/jira/browse/KAFKA-10252?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17154406#comment-17154406
 ] 

Bruno Cadonna commented on KAFKA-10252:
---------------------------------------

Thank you for the bug report, [~bulbfreeman].

I think this is a duplicate of 
https://issues.apache.org/jira/browse/KAFKA-10179. There is also an open PR for 
fixing the bug linked in that ticket. 

> MeteredTimestampedKeyValueStore not setting up correct topic name for 
> GlobalKTable associating StateStores
> ----------------------------------------------------------------------------------------------------------
>
>                 Key: KAFKA-10252
>                 URL: https://issues.apache.org/jira/browse/KAFKA-10252
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 2.4.1
>            Reporter: Gordon Wang
>            Priority: Major
>         Attachments: 
> 0001-KAFKA-10252-Use-source-topic-name-for-MeteredTimesta.patch
>
>
> When creating a GlobalKTable for getting a ReadOnlyKeyValueStore likw below:
> {code}
> GlobalKTable<GenericRecord, GenericRecord> globalTable = 
> streamsBuilder.globalTable(topic,
>                 Consumed.with(keySerde, valueSerde),
>                 Materialized.as(Stores.inMemoryKeyValueStore(topic)));
> {code}
> I got StreamsException like below:
> {code}
> org.apache.kafka.common.errors.SerializationException: Error retrieving Avro 
> schema for topic applicationId-sourceTopicName-changelog
> Caused by: 
> io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: 
> Subject not found.; error code: 40401
> {code}
> But as seen in GlobalKTable Java Doc the changelog stream shall not be 
> created and in fact was not created.This leads to our custom serde to be 
> searching for schema (we are using Confluent Platform and Avro based schema 
> registry for the job) using a wrong topic name (should just be 
> sourceTopicName rather than applicationId-sourceTopicName-changelog).
> After digging into the code, I found *initStoreSerde* method in 
> *MeteredTimestampedKeyValueStore* would assume the topic backing the store 
> would always be storeChangelogTopic when initializing the Serdes for the 
> state store, I think for GlobalKTables (ones having a 
> GlobalProcessorContextImpl ProcessorContext) we shall use the original topic 
> name directly here.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to