[jira] [Commented] (KAFKA-8153) Streaming application with state stores takes up to 1 hour to restart
[ https://issues.apache.org/jira/browse/KAFKA-8153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16820821#comment-16820821 ] Michael Melsen commented on KAFKA-8153: --- No unfortunately disabling the clean up didn't solve the problem. However I do have a theory what happens: We run different instances of the same streaming application. Each instance is running in a docker container on top of kubernetes. Each container has its own persistence. When restarting the instances, I think that some or sometimes all instances got linked to other persistent volumes instead of the once they were previously using. Perhaps tasks are reshuffled among different instances? This caused the state stores to become obsolete and restoration process to kick in. We solved this by utilizing NFS to share the persistent volumes in a way that all instances would point to the same state store directory structure. This seems to solve the issue > Streaming application with state stores takes up to 1 hour to restart > - > > Key: KAFKA-8153 > URL: https://issues.apache.org/jira/browse/KAFKA-8153 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Michael Melsen >Priority: Major > > We are using spring cloud stream with Kafka streams 2.0.1 and utilizing the > InteractiveQueryService to fetch data from the stores. There are 4 stores > that persist data on disk after aggregating data. The code for the topology > looks like this: > {code:java} > @Slf4j > @EnableBinding(SensorMeasurementBinding.class) > public class Consumer { > public static final String RETENTION_MS = "retention.ms"; > public static final String CLEANUP_POLICY = "cleanup.policy"; > @Value("${windowstore.retention.ms}") > private String retention; > /** > * Process the data flowing in from a Kafka topic. Aggregate the data to: > * - 2 minute > * - 15 minutes > * - one hour > * - 12 hours > * > * @param stream > */ > @StreamListener(SensorMeasurementBinding.ERROR_SCORE_IN) > public void process(KStream stream) { > Map topicConfig = new HashMap<>(); > topicConfig.put(RETENTION_MS, retention); > topicConfig.put(CLEANUP_POLICY, "delete"); > log.info("Changelog and local window store retention.ms: {} and > cleanup.policy: {}", > topicConfig.get(RETENTION_MS), > topicConfig.get(CLEANUP_POLICY)); > createWindowStore(LocalStore.TWO_MINUTES_STORE, topicConfig, stream); > createWindowStore(LocalStore.FIFTEEN_MINUTES_STORE, topicConfig, stream); > createWindowStore(LocalStore.ONE_HOUR_STORE, topicConfig, stream); > createWindowStore(LocalStore.TWELVE_HOURS_STORE, topicConfig, stream); > } > private void createWindowStore( > LocalStore localStore, > Map topicConfig, > KStream stream) { > // Configure how the statestore should be materialized using the provide > storeName > Materialized> materialized > = Materialized > .as(localStore.getStoreName()); > // Set retention of changelog topic > materialized.withLoggingEnabled(topicConfig); > // Configure how windows looks like and how long data will be retained in > local stores > TimeWindows configuredTimeWindows = getConfiguredTimeWindows( > localStore.getTimeUnit(), > Long.parseLong(topicConfig.get(RETENTION_MS))); > // Processing description: > // The input data are 'samples' with key > ::: > // 1. With the map we add the Tag to the key and we extract the error > score from the data > // 2. With the groupByKey we group the data on the new key > // 3. With windowedBy we split up the data in time intervals depending on > the provided LocalStore enum > // 4. With reduce we determine the maximum value in the time window > // 5. Materialized will make it stored in a table > stream > .map(getInstallationAssetModelAlgorithmTagKeyMapper()) > .groupByKey() > .windowedBy(configuredTimeWindows) > .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, > newValue), materialized); > } > private TimeWindows getConfiguredTimeWindows(long windowSizeMs, long > retentionMs) { > TimeWindows timeWindows = TimeWindows.of(windowSizeMs); > timeWindows.until(retentionMs); > return timeWindows; > } > /** >* Determine the max error score to keep by looking at the aggregated error > signal and >* freshly consumed error signal >* >* @param aggValue >* @param newValue >* @return >*/ > private ErrorScore getMaxErrorScore(ErrorScore aggValue, ErrorScore > newValue) { > if(aggValue.getErrorSignal() > newValue.getErrorSignal()) { > return aggValue; > } > return newValue; > } >
[jira] [Commented] (KAFKA-8153) Streaming application with state stores takes up to 1 hour to restart
[ https://issues.apache.org/jira/browse/KAFKA-8153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812153#comment-16812153 ] Michael Melsen commented on KAFKA-8153: --- [~guozhang] the cleanupconfig is some bean in Spring Cloud Stream Kafka Binder that, by default, will delete local state stores on start or stop. As this is not the required behavior I overridden this bean to avoid deleting the local state stores. I do have some new information about this issue. We deployed multiple instances of our streaming application on kubernetes. Each instance got its own persistent volume. We now assume that every time an instance is restarted, it is not directed to the previously connected persistent volume. Therefore, we assume, a mismatch occurs where the state store is no longer valid and Kafka will restore each instance. This would also makes sense as on local workstations, where the state stores end up at the same location, we don't experience the same issues. In addition to resolve this issue, although we need some more testing, we used a NFS share where each instance would write in the same location, and this now avoids extensive restoration of the stores > Streaming application with state stores takes up to 1 hour to restart > - > > Key: KAFKA-8153 > URL: https://issues.apache.org/jira/browse/KAFKA-8153 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Michael Melsen >Priority: Major > > We are using spring cloud stream with Kafka streams 2.0.1 and utilizing the > InteractiveQueryService to fetch data from the stores. There are 4 stores > that persist data on disk after aggregating data. The code for the topology > looks like this: > {code:java} > @Slf4j > @EnableBinding(SensorMeasurementBinding.class) > public class Consumer { > public static final String RETENTION_MS = "retention.ms"; > public static final String CLEANUP_POLICY = "cleanup.policy"; > @Value("${windowstore.retention.ms}") > private String retention; > /** > * Process the data flowing in from a Kafka topic. Aggregate the data to: > * - 2 minute > * - 15 minutes > * - one hour > * - 12 hours > * > * @param stream > */ > @StreamListener(SensorMeasurementBinding.ERROR_SCORE_IN) > public void process(KStream stream) { > Map topicConfig = new HashMap<>(); > topicConfig.put(RETENTION_MS, retention); > topicConfig.put(CLEANUP_POLICY, "delete"); > log.info("Changelog and local window store retention.ms: {} and > cleanup.policy: {}", > topicConfig.get(RETENTION_MS), > topicConfig.get(CLEANUP_POLICY)); > createWindowStore(LocalStore.TWO_MINUTES_STORE, topicConfig, stream); > createWindowStore(LocalStore.FIFTEEN_MINUTES_STORE, topicConfig, stream); > createWindowStore(LocalStore.ONE_HOUR_STORE, topicConfig, stream); > createWindowStore(LocalStore.TWELVE_HOURS_STORE, topicConfig, stream); > } > private void createWindowStore( > LocalStore localStore, > Map topicConfig, > KStream stream) { > // Configure how the statestore should be materialized using the provide > storeName > Materialized> materialized > = Materialized > .as(localStore.getStoreName()); > // Set retention of changelog topic > materialized.withLoggingEnabled(topicConfig); > // Configure how windows looks like and how long data will be retained in > local stores > TimeWindows configuredTimeWindows = getConfiguredTimeWindows( > localStore.getTimeUnit(), > Long.parseLong(topicConfig.get(RETENTION_MS))); > // Processing description: > // The input data are 'samples' with key > ::: > // 1. With the map we add the Tag to the key and we extract the error > score from the data > // 2. With the groupByKey we group the data on the new key > // 3. With windowedBy we split up the data in time intervals depending on > the provided LocalStore enum > // 4. With reduce we determine the maximum value in the time window > // 5. Materialized will make it stored in a table > stream > .map(getInstallationAssetModelAlgorithmTagKeyMapper()) > .groupByKey() > .windowedBy(configuredTimeWindows) > .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, > newValue), materialized); > } > private TimeWindows getConfiguredTimeWindows(long windowSizeMs, long > retentionMs) { > TimeWindows timeWindows = TimeWindows.of(windowSizeMs); > timeWindows.until(retentionMs); > return timeWindows; > } > /** >* Determine the max error score to keep by looking at the aggregated error > signal and >* freshly consumed error signal >* >* @param aggValue >* @param
[jira] [Updated] (KAFKA-8153) Streaming application with state stores takes up to 1 hour to restart
[ https://issues.apache.org/jira/browse/KAFKA-8153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Michael Melsen updated KAFKA-8153: -- Affects Version/s: (was: 2.1.1) 2.1.0 > Streaming application with state stores takes up to 1 hour to restart > - > > Key: KAFKA-8153 > URL: https://issues.apache.org/jira/browse/KAFKA-8153 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Michael Melsen >Priority: Major > > We are using spring cloud stream with Kafka streams 2.0.1 and utilizing the > InteractiveQueryService to fetch data from the stores. There are 4 stores > that persist data on disk after aggregating data. The code for the topology > looks like this: > {code:java} > @Slf4j > @EnableBinding(SensorMeasurementBinding.class) > public class Consumer { > public static final String RETENTION_MS = "retention.ms"; > public static final String CLEANUP_POLICY = "cleanup.policy"; > @Value("${windowstore.retention.ms}") > private String retention; > /** > * Process the data flowing in from a Kafka topic. Aggregate the data to: > * - 2 minute > * - 15 minutes > * - one hour > * - 12 hours > * > * @param stream > */ > @StreamListener(SensorMeasurementBinding.ERROR_SCORE_IN) > public void process(KStream stream) { > Map topicConfig = new HashMap<>(); > topicConfig.put(RETENTION_MS, retention); > topicConfig.put(CLEANUP_POLICY, "delete"); > log.info("Changelog and local window store retention.ms: {} and > cleanup.policy: {}", > topicConfig.get(RETENTION_MS), > topicConfig.get(CLEANUP_POLICY)); > createWindowStore(LocalStore.TWO_MINUTES_STORE, topicConfig, stream); > createWindowStore(LocalStore.FIFTEEN_MINUTES_STORE, topicConfig, stream); > createWindowStore(LocalStore.ONE_HOUR_STORE, topicConfig, stream); > createWindowStore(LocalStore.TWELVE_HOURS_STORE, topicConfig, stream); > } > private void createWindowStore( > LocalStore localStore, > Map topicConfig, > KStream stream) { > // Configure how the statestore should be materialized using the provide > storeName > Materialized> materialized > = Materialized > .as(localStore.getStoreName()); > // Set retention of changelog topic > materialized.withLoggingEnabled(topicConfig); > // Configure how windows looks like and how long data will be retained in > local stores > TimeWindows configuredTimeWindows = getConfiguredTimeWindows( > localStore.getTimeUnit(), > Long.parseLong(topicConfig.get(RETENTION_MS))); > // Processing description: > // The input data are 'samples' with key > ::: > // 1. With the map we add the Tag to the key and we extract the error > score from the data > // 2. With the groupByKey we group the data on the new key > // 3. With windowedBy we split up the data in time intervals depending on > the provided LocalStore enum > // 4. With reduce we determine the maximum value in the time window > // 5. Materialized will make it stored in a table > stream > .map(getInstallationAssetModelAlgorithmTagKeyMapper()) > .groupByKey() > .windowedBy(configuredTimeWindows) > .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, > newValue), materialized); > } > private TimeWindows getConfiguredTimeWindows(long windowSizeMs, long > retentionMs) { > TimeWindows timeWindows = TimeWindows.of(windowSizeMs); > timeWindows.until(retentionMs); > return timeWindows; > } > /** >* Determine the max error score to keep by looking at the aggregated error > signal and >* freshly consumed error signal >* >* @param aggValue >* @param newValue >* @return >*/ > private ErrorScore getMaxErrorScore(ErrorScore aggValue, ErrorScore > newValue) { > if(aggValue.getErrorSignal() > newValue.getErrorSignal()) { > return aggValue; > } > return newValue; > } > private KeyValueMapper KeyValue> > getInstallationAssetModelAlgorithmTagKeyMapper() { > return (s, sensorMeasurement) -> new KeyValue<>(s + "::" + > sensorMeasurement.getT(), > new ErrorScore(sensorMeasurement.getTs(), > sensorMeasurement.getE(), sensorMeasurement.getO())); > } > } > {code} > So we are materializing aggregated data to four different stores after > determining the max value within a specific window for a specific key. Please > note that retention which is set to two months of data and the clean up > policy delete. We don't compact data. > The size of the individual state stores on disk is between 14 to 20 gb of > data. > We are making use of Interactive Queries: >
[jira] [Created] (KAFKA-8153) Streaming application with state stores takes up to 1 hour to restart
Michael Melsen created KAFKA-8153: - Summary: Streaming application with state stores takes up to 1 hour to restart Key: KAFKA-8153 URL: https://issues.apache.org/jira/browse/KAFKA-8153 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.1.1 Reporter: Michael Melsen We are using spring cloud stream with Kafka streams 2.0.1 and utilizing the InteractiveQueryService to fetch data from the stores. There are 4 stores that persist data on disk after aggregating data. The code for the topology looks like this: {code:java} @Slf4j @EnableBinding(SensorMeasurementBinding.class) public class Consumer { public static final String RETENTION_MS = "retention.ms"; public static final String CLEANUP_POLICY = "cleanup.policy"; @Value("${windowstore.retention.ms}") private String retention; /** * Process the data flowing in from a Kafka topic. Aggregate the data to: * - 2 minute * - 15 minutes * - one hour * - 12 hours * * @param stream */ @StreamListener(SensorMeasurementBinding.ERROR_SCORE_IN) public void process(KStream stream) { Map topicConfig = new HashMap<>(); topicConfig.put(RETENTION_MS, retention); topicConfig.put(CLEANUP_POLICY, "delete"); log.info("Changelog and local window store retention.ms: {} and cleanup.policy: {}", topicConfig.get(RETENTION_MS), topicConfig.get(CLEANUP_POLICY)); createWindowStore(LocalStore.TWO_MINUTES_STORE, topicConfig, stream); createWindowStore(LocalStore.FIFTEEN_MINUTES_STORE, topicConfig, stream); createWindowStore(LocalStore.ONE_HOUR_STORE, topicConfig, stream); createWindowStore(LocalStore.TWELVE_HOURS_STORE, topicConfig, stream); } private void createWindowStore( LocalStore localStore, Map topicConfig, KStream stream) { // Configure how the statestore should be materialized using the provide storeName Materialized> materialized = Materialized .as(localStore.getStoreName()); // Set retention of changelog topic materialized.withLoggingEnabled(topicConfig); // Configure how windows looks like and how long data will be retained in local stores TimeWindows configuredTimeWindows = getConfiguredTimeWindows( localStore.getTimeUnit(), Long.parseLong(topicConfig.get(RETENTION_MS))); // Processing description: // The input data are 'samples' with key ::: // 1. With the map we add the Tag to the key and we extract the error score from the data // 2. With the groupByKey we group the data on the new key // 3. With windowedBy we split up the data in time intervals depending on the provided LocalStore enum // 4. With reduce we determine the maximum value in the time window // 5. Materialized will make it stored in a table stream .map(getInstallationAssetModelAlgorithmTagKeyMapper()) .groupByKey() .windowedBy(configuredTimeWindows) .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, newValue), materialized); } private TimeWindows getConfiguredTimeWindows(long windowSizeMs, long retentionMs) { TimeWindows timeWindows = TimeWindows.of(windowSizeMs); timeWindows.until(retentionMs); return timeWindows; } /** * Determine the max error score to keep by looking at the aggregated error signal and * freshly consumed error signal * * @param aggValue * @param newValue * @return */ private ErrorScore getMaxErrorScore(ErrorScore aggValue, ErrorScore newValue) { if(aggValue.getErrorSignal() > newValue.getErrorSignal()) { return aggValue; } return newValue; } private KeyValueMapper> getInstallationAssetModelAlgorithmTagKeyMapper() { return (s, sensorMeasurement) -> new KeyValue<>(s + "::" + sensorMeasurement.getT(), new ErrorScore(sensorMeasurement.getTs(), sensorMeasurement.getE(), sensorMeasurement.getO())); } } {code} So we are materializing aggregated data to four different stores after determining the max value within a specific window for a specific key. Please note that retention which is set to two months of data and the clean up policy delete. We don't compact data. The size of the individual state stores on disk is between 14 to 20 gb of data. We are making use of Interactive Queries: [https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#interactive-queries] On our setup we have 4 instances of our streaming app to be used as one consumer group. So every instance will store a specific part of all data in its store. This all seems to work nicely. Until we restart one or more instances and wait for it to become available again. (Restart time only is about 3 minutes max). I would expect that the restart of the app would not take that long but unfortunately it