[ 
https://issues.apache.org/jira/browse/KAFKA-19923?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=18041424#comment-18041424
 ] 

sanghyeok An edited comment on KAFKA-19923 at 11/29/25 3:15 AM:
----------------------------------------------------------------

HI [~mjsax] !

I totally agree with your assessment.

 

*For About Breaking Change*

To be honest, since I don't have much experience contributing to Kafka yet, I 
wasn't entirely sure about the strict scope of Breaking Changes in this project.
I really appreciate you clarifying that moving an existing runtime failure to a 
build-time check is considered a fix/improvement rather than a breaking change. 
That makes perfect sense.

 

*For Verification Logic ({{{}TimestampExtractor{}}} Nuance)*

I also agree that a class-level check is the most practical approach. 
However, there is one small nuance to consider that {{TimestampExtractor}} is a 
{{{}@FunctionalInterface{}}}. 
If users define inline lambdas, they might result in different synthetic 
classes at runtime, which could cause a strict {{getClass()}} check to fail 
even if the logic is semantically identical. it enforces that user use same 
lambda functions. 

 

Also, we consider generic serde such as:

 
{code:java}
@Test
void test1() {
    final Serde<Foo> fooSerde = JsonSerdes.jsonSerde(Foo.class);
    final Serde<Bar> barSerde = JsonSerdes.jsonSerde(Bar.class);

    assertThat(fooSerde.getClass()).equals(barSerde.getClass());
} {code}
In geneneral, KafkaStreams wrapper application use this pattern such JsonSerde 
Class in Spring-Kafka. 
([https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerde.java])

 

 

So, we should check whether both instances and their classes are the same.
{code:java}
var mainNodeExtractor = mainSourceNode.consumedInternal().timestampExtractor;
var mainNodeKeySerde = mainSourceNode.consumedInternal().keySerde();
var mainNodeValueSerde = mainSourceNode.consumedInternal().valueSerde();

var currentNodeExtractor = 
currentSourceNode.consumedInternal().timestampExtractor;
var currentNodeKeySerde = currentSourceNode.consumedInternal().keySerde();
var currentNodeValueSerde = currentSourceNode.consumedInternal().valueSerde();


boolean isSameExtractor = (mainNodeExtractor == currentNodeExtractor) || 
                          
(mainNodeExtractor.getClass().equals(currentNodeExtractor.getClass()));

if (!isSameExtractor) {
    throw new TopologyException("Conflict: TimestampExtractor mismatch...");
}

boolean isSameKeySerde = (mainNodeKeySerde == currentNodeKeySerde) || 
                         
(mainNodeKeySerde.getClass().equals(currentNodeKeySerde.getClass()));
                         
if (!isSameKeySerde) {
     throw new TopologyException("Conflict: Key Serde mismatch...");
}

boolean isSameValueSerde = (mainNodeValueSerde == currentNodeValueSerde) || 
(mainNodeValueSerde.getClass().equals(currentNodeValueSerde.getClass())); 

if (!isSameValueSerde) { 
  throw new TopologyException("Conflict: Value Serde mismatch..."); 
}{code}
 

What do you think? Please let me know you opinion!


was (Author: JIRAUSER303328):
HI [~mjsax] !

I totally agree with your assessment.

 

*For About Breaking Change*

To be honest, since I don't have much experience contributing to Kafka yet, I 
wasn't entirely sure about the strict scope of Breaking Changes in this project.
I really appreciate you clarifying that moving an existing runtime failure to a 
build-time check is considered a fix/improvement rather than a breaking change. 
That makes perfect sense.

 

*For Verification Logic ({{{}TimestampExtractor{}}} Nuance)*

I also agree that a class-level check is the most practical approach. 
However, there is one small nuance to consider that {{TimestampExtractor}} is a 
{{{}@FunctionalInterface{}}}. 
If users define inline lambdas, they might result in different synthetic 
classes at runtime, which could cause a strict {{getClass()}} check to fail 
even if the logic is semantically identical. it enforces that user use same 
lambda functions. 

 

Also, we consider generic serde such as:

 
{code:java}
@Test
void test1() {
    final Serde<Foo> fooSerde = JsonSerdes.jsonSerde(Foo.class);
    final Serde<Bar> barSerde = JsonSerdes.jsonSerde(Bar.class);

    assertThat(fooSerde.getClass()).equals(barSerde.getClass());
} {code}
In geneneral, KafkaStreams wrapper application use this pattern such JsonSerde 
Class in Spring-Kafka. 
(https://github.com/spring-projects/spring-kafka/blob/main/spring-kafka/src/main/java/org/springframework/kafka/support/serializer/JsonSerde.java)

 

 

So, we should check whether both instances and their classes are the same.

 
{code:java}
var mainNodeExtractor = mainSourceNode.consumedInternal().timestampExtractor;
var mainNodeKeySerde = mainSourceNode.consumedInternal().keySerde();
var mainNodeValueSerde = mainSourceNode.consumedInternal().valueSerde();

var currentNodeExtractor = 
currentSourceNode.consumedInternal().timestampExtractor;
var currentNodeKeySerde = currentSourceNode.consumedInternal().keySerde();
var currentNodeValueSerde = currentSourceNode.consumedInternal().valueSerde();


boolean isSameExtractor = (mainNodeExtractor == currentNodeExtractor) || 
                          
(mainNodeExtractor.getClass().equals(currentNodeExtractor.getClass()));

if (!isSameExtractor) {
    throw new TopologyException("Conflict: TimestampExtractor mismatch...");
}

boolean isSameKeySerde = (mainNodeKeySerde == currentNodeKeySerde) || 
                         
(mainNodeKeySerde.getClass().equals(currentNodeKeySerde.getClass()));
                         
if (!isSameKeySerde) {
     throw new TopologyException("Conflict: Key Serde mismatch...");
}

boolean isSameValueSerde = (mainNodeValueSerde == currentNodeValueSerde) || 
(mainNodeValueSerde.getClass().equals(currentNodeValueSerde.getClass())); 

if (!isSameValueSerde) { 
  throw new TopologyException("Conflict: Value Serde mismatch..."); 
}{code}
 

> Kafka Streams throws ClassCastException with different Consumed instances.
> --------------------------------------------------------------------------
>
>                 Key: KAFKA-19923
>                 URL: https://issues.apache.org/jira/browse/KAFKA-19923
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>            Reporter: sanghyeok An
>            Assignee: sanghyeok An
>            Priority: Minor
>
> Kafka Streams throws a ClassCastException when using different Consumed 
> instances for the same topic.
> For example:
> {code:java}
> builder.stream("A", Consumed.with(Serdes.String(), Serdes.String()))
>        .peek((k, v) -> System.out.println(k));
> builder.stream("A", Consumed.with(Serdes.ByteArray(), Serdes.ByteArray()))
>        .peek((k, v) -> System.out.println(k));
> {code}
>  
> Since both use the same topic name and the same ConsumedInternal 
> configuration for auto offset reset, these two StreamSourceNodes are merged 
> during topology building.
>  
> As a result, the Topology is built successfully.
>  
> However, when the StreamThread starts, the consumer begins to receive records 
> from the broker, and the records flow through the pipeline, a 
> ClassCastException is thrown at runtime.
>  
> In my opinion, we have two options:
>  # Document this behavior.
>  # When merging source nodes, the builder should consider the full 
> ConsumedInternal configuration (for example, key/value SerDes and timestamp 
> extractor), instead of only the auto offset reset policy.
>  
> I think option 1 is also acceptable, because Kafka Streams will fail fast 
> with a ClassCastException before the consumer commits any offsets.
>  
> Option 2 would require more substantial changes in Kafka Streams, because 
> TimestampExtractor and key/value SerDes do not expose a straightforward way 
> to check semantic equivalence.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to