Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
mjsax commented on PR #15790: URL: https://github.com/apache/kafka/pull/15790#issuecomment-2151048809 Thanks for the PR @AyoubOm. Merged to `trunk` and cherry-picked to `3.8` branch. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
mjsax merged PR #15790: URL: https://github.com/apache/kafka/pull/15790 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1622840595 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); +} catch (final StreamsException e) { Review Comment: Thanks everyone for your input ! I will make the change accordingly -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
ableegoldman commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1621827532 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); +} catch (final StreamsException e) { Review Comment: Good point -- I agree with Bruno, the catch block should be for just ConfigException or StreamsException -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
cadonna commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1620256284 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); +} catch (final StreamsException e) { Review Comment: Why `RuntimeException`? Can't we catch `ConfigException` and `StreamsException` in one catch clause and then throw a `StreamsException`. This seems safer to me, because otherwise with future code changes that might throw exceptions like `IllegalStateException` we would wrap unexpected exception due to mistakes in Streams in `StreamsException`. ```java catch (final ConfigException | final StreamsException ex) { // handle the exception } ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
ableegoldman commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1619985405 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); +} catch (final StreamsException e) { Review Comment: Can't say what it's original purpose was but StreamsException has definitely morphed into a catch-all for exceptions throughout Streams. It's definitely not exclusive to the state of a task though (that would be ProcessorStateException). The nice thing about StreamsException is you can add other useful metadata such as the taskId where the error originated, so I always prefer to just throw the StreamsException. We also know for a fact that StreamsException will be caught and handled properly as it gets bubbled up. So I'd go for merging this into a single `catch RuntimeException` block, then wrap it in a StreamsException. And don't forget to add the task id too! 😄 (you can get it from the processor context) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
mjsax commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1619318104 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); +} catch (final StreamsException e) { Review Comment: It's a corner case I would say -- also, we do wrap a log of exception as StreamsException throughout the whole code base, and given that we wrap the original `ConfigException` inside the StreamsException we would not lose information. It's just the idea to simplify the code a little bit. Let's hear what @cadonna or @ableegoldman think. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1618387142 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); +} catch (final StreamsException e) { Review Comment: @mjsax the idea was to preserve the exception that was thrown, as I think StreamsException and ConfigException don't have the same nature. AFAIK StreamsException captures problems related to the state of the task. If you believe it's ok to throw StreamsException here, I can do the change -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
mjsax commented on PR #15790: URL: https://github.com/apache/kafka/pull/15790#issuecomment-2136299144 Sorry for the long wait... KIP and feature freeze deadline for 3.8 got me busy. But this PR look overall good, and can easily get it merge before code freeze :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
mjsax commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1618014606 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); +} catch (final StreamsException e) { Review Comment: Wondering if we should merge both blocks and just catch `RuntimeException`? Seems to be duplicate code? And just pass in whatever exception we catch and rethrow as `new StreamsException(..., e);` -- this way we preserve the original exception and don't need to "awkwardly" add `e.getMessage()` for the first case. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
mjsax commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1618014606 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); +} catch (final StreamsException e) { Review Comment: Wondering if we should merge both blocks and just catch `RuntimeException`? Seems to be duplicate code? And just pass in whatever exception we catch and rethrow as `new StreamsException(..., e);` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on PR #15790: URL: https://github.com/apache/kafka/pull/15790#issuecomment-2126424513 @mjsax please let me know if the changes are fine for you ;) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1596496660 ## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java: ## @@ -156,8 +156,7 @@ public void testTopologyLevelConfigException() { final ConfigException se = assertThrows(ConfigException.class, () -> new TopologyTestDriver(topology)); final String msg = se.getMessage(); -assertTrue("Error about class cast with serdes", msg.contains("StreamsConfig#DEFAULT_KEY_SERDE_CLASS_CONFIG")); Review Comment: recovered test ## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.StateSerdes; + + +public class StoreSerdeInitializer { +static StateSerdes prepareStoreSerde(final StateStoreContext context, final String storeName, + final String changelogTopic, final Serde keySerde, Review Comment: done -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1596496472 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); Review Comment: added original message -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1596492735 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); Review Comment: Nice catch ! Updated -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1596366478 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); +} catch (final StreamsException e) { Review Comment: `prepareSerializer` calls `context.keySerde` which calls `config.defaultKeySerde()`. The latter can throw both exceptions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1596366478 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); +} catch (final StreamsException e) { Review Comment: prepareSerializer calls context.keySerde which calls config.defaultKeySerde(). The latter can throw both exceptions ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); +} catch (final StreamsException e) { Review Comment: `prepareSerializer` calls `context.keySerde` which calls `config.defaultKeySerde().` The latter can throw both exceptions -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
mjsax commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1596180605 ## streams/src/main/java/org/apache/kafka/streams/state/internals/StoreSerdeInitializer.java: ## @@ -0,0 +1,69 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.state.internals; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.kstream.internals.WrappingNullableUtils; +import org.apache.kafka.streams.processor.ProcessorContext; +import org.apache.kafka.streams.processor.StateStoreContext; +import org.apache.kafka.streams.processor.internals.SerdeGetter; +import org.apache.kafka.streams.state.StateSerdes; + + +public class StoreSerdeInitializer { +static StateSerdes prepareStoreSerde(final StateStoreContext context, final String storeName, + final String changelogTopic, final Serde keySerde, Review Comment: nit formatting. We should have a single parameter per line, not multiple (both line above) -- also below ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); Review Comment: Should we somehow preserve `e.getMessage()` -- it seems useful? ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); Review Comment: I did dig into `prepareKeySerializer` and `prepareValueSerializer` which both use `WrappingNullableUtils#prepareSerializer()` which might call both `context.keySerde()` and `context.valueSerde()`, and thus, I believe we could currently get an exception when trying to get the key serde, even if default key serde is set, but default value serde is not set? I think this internal helper method needs some updated, too. ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,21 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize key serdes for sink node %s", name())); +} catch (final StreamsException e) { Review Comment: Why are we catching `StreamsException`? Seems the only exception that might bubble up her is a `ConfigException`? ## streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorNodeTest.java: ## @@ -156,8 +156,7 @@ public void testTopologyLevelConfigException() { final ConfigException se = assertThrows(ConfigException.c
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1589953917 ## streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java: ## @@ -173,21 +172,15 @@ protected Serde prepareValueSerdeForStore(final Serde valueSerde, final Se protected void initStoreSerde(final ProcessorContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); -serdes = new StateSerdes<>( -changelogTopic, -prepareKeySerde(keySerde, new SerdeGetter(context)), -prepareValueSerdeForStore(valueSerde, new SerdeGetter(context)) -); +serdes = StoreSerdeInitializer.prepareStoreSerde( +context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerdeForStore); } protected void initStoreSerde(final StateStoreContext context) { final String storeName = name(); final String changelogTopic = ProcessorContextUtils.changelogFor(context, storeName, Boolean.FALSE); -serdes = new StateSerdes<>( -changelogTopic, -prepareKeySerde(keySerde, new SerdeGetter(context)), -prepareValueSerdeForStore(valueSerde, new SerdeGetter(context)) -); +serdes = StoreSerdeInitializer.prepareStoreSerde( +context, storeName, changelogTopic, keySerde, valueSerde, this::prepareValueSerdeForStore); Review Comment: I added a parameter function prepareValueSerde to be able to use the correct function in children TimestampedStore classes. These don't directly use `WrappingNullableUtils.prepareValueSerde` by overriding the behavior of prepareValueSerdeForStore -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1589952122 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,14 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize serdes for sink node %s", name()), e); Review Comment: Good idea, I moved the exception handling in a separate class to share the code between the store classes, with corresponding unit tests -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
mjsax commented on code in PR #15790: URL: https://github.com/apache/kafka/pull/15790#discussion_r1587028687 ## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ## @@ -58,8 +60,14 @@ public void addChild(final ProcessorNode child) { public void init(final InternalProcessorContext context) { super.init(context); this.context = context; -keySerializer = prepareKeySerializer(keySerializer, context, this.name()); -valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +try { +keySerializer = prepareKeySerializer(keySerializer, context, this.name()); +valSerializer = prepareValueSerializer(valSerializer, context, this.name()); +} catch (final ConfigException e) { +throw new ConfigException(String.format("Failed to initialize serdes for sink node %s", name()), e); Review Comment: Should we split this up further, and have two try-catch blocks, one for the key, and one for the value, to narrow it down further and add key/value as information to the error message? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org
Re: [PR] KAFKA-16573: Specify node and store where serdes are needed [kafka]
AyoubOm commented on PR #15790: URL: https://github.com/apache/kafka/pull/15790#issuecomment-2077173169 ping @mjsax -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org