[
https://issues.apache.org/jira/browse/KAFKA-4963?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15947496#comment-15947496
]
Matthias J. Sax commented on KAFKA-4963:
----------------------------------------
I guess this would be possible to add into the API. If you want this feature,
please create a JIRA for it (and add the label "needs-kip"). Personally, I
think the API should be somewhat different -- but this would be part of the KIP
discussion that is required for any user facing API changes:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals
> Global Store: startup recovery process skipping processor
> ---------------------------------------------------------
>
> Key: KAFKA-4963
> URL: https://issues.apache.org/jira/browse/KAFKA-4963
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 0.10.2.0
> Reporter: Yennick Trevels
>
> This issue is related to the recovery process of a global store. It might be
> that I'm misunderstanding the design of the global store as it's all quite
> new to me, but I wanted to verify this case.
> I'm trying to create a global store with a processor which transforms the
> values from the source and puts them into the state store, and I want all
> these transformed values to be available in every streams job (therefore the
> use of a global store)
> I'll give you an example which I created based on an existing Kafka Streams
> unit test:
> {code}
> final StateStoreSupplier storeSupplier = Stores.create("my-store")
>
> .withStringKeys().withIntegerValues().inMemory().disableLogging().build();
> final String global = "global";
> final String topic = "topic";
> final KeyValueStore<String, String> globalStore = (KeyValueStore<String,
> String>) storeSupplier.get();
> final TopologyBuilder topologyBuilder = this.builder
> .addGlobalStore(globalStore, global, STRING_DESERIALIZER,
> STRING_DESERIALIZER, topic, "processor", define(new
> ValueToLengthStatefulProcessor("my-store")));
> driver = new ProcessorTopologyTestDriver(config, topologyBuilder);
> driver.process(topic, "key1", "value1", STRING_SERIALIZER, STRING_SERIALIZER);
> driver.process(topic, "key2", "value2", STRING_SERIALIZER, STRING_SERIALIZER);
> assertEquals("value1".length(), globalStore.get("key1"));
> assertEquals("value2".length(), globalStore.get("key2"));
> {code}
> The ValueToLengthStatefulProcessor basically takes the incoming value,
> calculates the length of the string, and puts the result in the state store.
> Note the difference in types between the source stream (string values) and
> the data store (integer values)
> If I understand global stores correctly and based on what I've tried out
> already, the stream of data runs like this:
> a source stream named "global" reading values from a Kafka topic called
> "topic" -> ValueToLengthStatefulProcessor -> "my-store" state store
> However, when the streams job starts up it runs the recovery process by
> reading out the source stream again. I've noticed that in this case it seems
> to skip the processor entirely and acts like the source stream is the
> changelog of the state store, making the data flow like this during the
> recovery process:
> source stream -> "my store" state store
> Because it acts like the source stream is the changelog of the state store,
> it also tries to use the deserializer of the state store. This won't work
> since the values of the state store should be integers, while the values in
> the source stream are strings.
> So all this will startup nicely as long as the source stream has no values
> yet. However, once the source stream has (string) values, the startup
> recovery process will fail since it will be sourcing directly to the state
> store instead of passing the source values to the processor.
> I believe this is caused by the following line of code in
> TopologyBuilder.addGlobalStore, which connects the store directly to the
> source topic.
> https://github.com/apache/kafka/blob/0.10.2.0/streams/src/main/java/org/apache/kafka/streams/processor/TopologyBuilder.java#L507
> Please let me know if I'm totally misunderstanding how global stores should
> work. But I think this might be a crucial bug or design flaw.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)