[jira] [Commented] (KAFKA-8153) Streaming application with state stores takes up to 1 hour to restart

2019-04-18 Thread Michael Melsen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16820821#comment-16820821
 ] 

Michael Melsen commented on KAFKA-8153:
---

No unfortunately disabling the clean up didn't solve the problem. However I do 
have a theory what happens:

 

We run different instances of the same streaming application. Each instance is 
running in a docker container on top of kubernetes. Each container has its own 
persistence. When restarting the instances, I think that some or sometimes all 
instances got linked to other persistent volumes instead of the once they were 
previously using. Perhaps tasks are reshuffled among different instances? This 
caused the state stores to become obsolete and restoration process to kick in. 
We solved this by utilizing NFS to share the persistent volumes in a way that 
all instances would point to the same state store directory structure. This 
seems to solve the issue

> Streaming application with state stores takes up to 1 hour to restart
> -
>
> Key: KAFKA-8153
> URL: https://issues.apache.org/jira/browse/KAFKA-8153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Michael Melsen
>Priority: Major
>
> We are using spring cloud stream with Kafka streams 2.0.1 and utilizing the 
> InteractiveQueryService to fetch data from the stores. There are 4 stores 
> that persist data on disk after aggregating data. The code for the topology 
> looks like this:
> {code:java}
> @Slf4j
> @EnableBinding(SensorMeasurementBinding.class)
> public class Consumer {
>   public static final String RETENTION_MS = "retention.ms";
>   public static final String CLEANUP_POLICY = "cleanup.policy";
>   @Value("${windowstore.retention.ms}")
>   private String retention;
> /**
>  * Process the data flowing in from a Kafka topic. Aggregate the data to:
>  * - 2 minute
>  * - 15 minutes
>  * - one hour
>  * - 12 hours
>  *
>  * @param stream
>  */
> @StreamListener(SensorMeasurementBinding.ERROR_SCORE_IN)
> public void process(KStream stream) {
> Map topicConfig = new HashMap<>();
> topicConfig.put(RETENTION_MS, retention);
> topicConfig.put(CLEANUP_POLICY, "delete");
> log.info("Changelog and local window store retention.ms: {} and 
> cleanup.policy: {}",
> topicConfig.get(RETENTION_MS),
> topicConfig.get(CLEANUP_POLICY));
> createWindowStore(LocalStore.TWO_MINUTES_STORE, topicConfig, stream);
> createWindowStore(LocalStore.FIFTEEN_MINUTES_STORE, topicConfig, stream);
> createWindowStore(LocalStore.ONE_HOUR_STORE, topicConfig, stream);
> createWindowStore(LocalStore.TWELVE_HOURS_STORE, topicConfig, stream);
>   }
>   private void createWindowStore(
> LocalStore localStore,
> Map topicConfig,
> KStream stream) {
> // Configure how the statestore should be materialized using the provide 
> storeName
> Materialized> materialized 
> = Materialized
> .as(localStore.getStoreName());
> // Set retention of changelog topic
> materialized.withLoggingEnabled(topicConfig);
> // Configure how windows looks like and how long data will be retained in 
> local stores
> TimeWindows configuredTimeWindows = getConfiguredTimeWindows(
> localStore.getTimeUnit(), 
> Long.parseLong(topicConfig.get(RETENTION_MS)));
> // Processing description:
> // The input data are 'samples' with key 
> :::
> // 1. With the map we add the Tag to the key and we extract the error 
> score from the data
> // 2. With the groupByKey we group  the data on the new key
> // 3. With windowedBy we split up the data in time intervals depending on 
> the provided LocalStore enum
> // 4. With reduce we determine the maximum value in the time window
> // 5. Materialized will make it stored in a table
> stream
> .map(getInstallationAssetModelAlgorithmTagKeyMapper())
> .groupByKey()
> .windowedBy(configuredTimeWindows)
> .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, 
> newValue), materialized);
>   }
>   private TimeWindows getConfiguredTimeWindows(long windowSizeMs, long 
> retentionMs) {
> TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
> timeWindows.until(retentionMs);
> return timeWindows;
>   }
>   /**
>* Determine the max error score to keep by looking at the aggregated error 
> signal and
>* freshly consumed error signal
>*
>* @param aggValue
>* @param newValue
>* @return
>*/
>   private ErrorScore getMaxErrorScore(ErrorScore aggValue, ErrorScore 
> newValue) {
> if(aggValue.getErrorSignal() > newValue.getErrorSignal()) {
> return aggValue;
> }
> return newValue;
>   }
>   

[jira] [Commented] (KAFKA-8153) Streaming application with state stores takes up to 1 hour to restart

2019-04-08 Thread Michael Melsen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8153?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16812153#comment-16812153
 ] 

Michael Melsen commented on KAFKA-8153:
---

[~guozhang] the cleanupconfig is some bean in Spring Cloud Stream Kafka Binder 
that, by default, will delete local state stores on start or stop. As this is 
not the required behavior I overridden this bean to avoid deleting the local 
state stores.

I do have some new information about this issue. We deployed multiple instances 
of our streaming application on kubernetes. Each instance got its own 
persistent volume. We now assume that every time an instance is restarted, it 
is not directed to the previously connected persistent volume. Therefore, we 
assume, a mismatch occurs where the state store is no longer valid and Kafka 
will restore each instance. 

This would also makes sense as on local workstations, where the state stores 
end up at the same location, we don't experience the same issues. 

In addition to resolve this issue, although we need some more testing, we used 
a NFS share where each instance would write in the same location, and this now 
avoids extensive restoration of the stores

> Streaming application with state stores takes up to 1 hour to restart
> -
>
> Key: KAFKA-8153
> URL: https://issues.apache.org/jira/browse/KAFKA-8153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Michael Melsen
>Priority: Major
>
> We are using spring cloud stream with Kafka streams 2.0.1 and utilizing the 
> InteractiveQueryService to fetch data from the stores. There are 4 stores 
> that persist data on disk after aggregating data. The code for the topology 
> looks like this:
> {code:java}
> @Slf4j
> @EnableBinding(SensorMeasurementBinding.class)
> public class Consumer {
>   public static final String RETENTION_MS = "retention.ms";
>   public static final String CLEANUP_POLICY = "cleanup.policy";
>   @Value("${windowstore.retention.ms}")
>   private String retention;
> /**
>  * Process the data flowing in from a Kafka topic. Aggregate the data to:
>  * - 2 minute
>  * - 15 minutes
>  * - one hour
>  * - 12 hours
>  *
>  * @param stream
>  */
> @StreamListener(SensorMeasurementBinding.ERROR_SCORE_IN)
> public void process(KStream stream) {
> Map topicConfig = new HashMap<>();
> topicConfig.put(RETENTION_MS, retention);
> topicConfig.put(CLEANUP_POLICY, "delete");
> log.info("Changelog and local window store retention.ms: {} and 
> cleanup.policy: {}",
> topicConfig.get(RETENTION_MS),
> topicConfig.get(CLEANUP_POLICY));
> createWindowStore(LocalStore.TWO_MINUTES_STORE, topicConfig, stream);
> createWindowStore(LocalStore.FIFTEEN_MINUTES_STORE, topicConfig, stream);
> createWindowStore(LocalStore.ONE_HOUR_STORE, topicConfig, stream);
> createWindowStore(LocalStore.TWELVE_HOURS_STORE, topicConfig, stream);
>   }
>   private void createWindowStore(
> LocalStore localStore,
> Map topicConfig,
> KStream stream) {
> // Configure how the statestore should be materialized using the provide 
> storeName
> Materialized> materialized 
> = Materialized
> .as(localStore.getStoreName());
> // Set retention of changelog topic
> materialized.withLoggingEnabled(topicConfig);
> // Configure how windows looks like and how long data will be retained in 
> local stores
> TimeWindows configuredTimeWindows = getConfiguredTimeWindows(
> localStore.getTimeUnit(), 
> Long.parseLong(topicConfig.get(RETENTION_MS)));
> // Processing description:
> // The input data are 'samples' with key 
> :::
> // 1. With the map we add the Tag to the key and we extract the error 
> score from the data
> // 2. With the groupByKey we group  the data on the new key
> // 3. With windowedBy we split up the data in time intervals depending on 
> the provided LocalStore enum
> // 4. With reduce we determine the maximum value in the time window
> // 5. Materialized will make it stored in a table
> stream
> .map(getInstallationAssetModelAlgorithmTagKeyMapper())
> .groupByKey()
> .windowedBy(configuredTimeWindows)
> .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, 
> newValue), materialized);
>   }
>   private TimeWindows getConfiguredTimeWindows(long windowSizeMs, long 
> retentionMs) {
> TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
> timeWindows.until(retentionMs);
> return timeWindows;
>   }
>   /**
>* Determine the max error score to keep by looking at the aggregated error 
> signal and
>* freshly consumed error signal
>*
>* @param aggValue
>* @param 

[jira] [Updated] (KAFKA-8153) Streaming application with state stores takes up to 1 hour to restart

2019-03-25 Thread Michael Melsen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8153?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Michael Melsen updated KAFKA-8153:
--
Affects Version/s: (was: 2.1.1)
   2.1.0

> Streaming application with state stores takes up to 1 hour to restart
> -
>
> Key: KAFKA-8153
> URL: https://issues.apache.org/jira/browse/KAFKA-8153
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Michael Melsen
>Priority: Major
>
> We are using spring cloud stream with Kafka streams 2.0.1 and utilizing the 
> InteractiveQueryService to fetch data from the stores. There are 4 stores 
> that persist data on disk after aggregating data. The code for the topology 
> looks like this:
> {code:java}
> @Slf4j
> @EnableBinding(SensorMeasurementBinding.class)
> public class Consumer {
>   public static final String RETENTION_MS = "retention.ms";
>   public static final String CLEANUP_POLICY = "cleanup.policy";
>   @Value("${windowstore.retention.ms}")
>   private String retention;
> /**
>  * Process the data flowing in from a Kafka topic. Aggregate the data to:
>  * - 2 minute
>  * - 15 minutes
>  * - one hour
>  * - 12 hours
>  *
>  * @param stream
>  */
> @StreamListener(SensorMeasurementBinding.ERROR_SCORE_IN)
> public void process(KStream stream) {
> Map topicConfig = new HashMap<>();
> topicConfig.put(RETENTION_MS, retention);
> topicConfig.put(CLEANUP_POLICY, "delete");
> log.info("Changelog and local window store retention.ms: {} and 
> cleanup.policy: {}",
> topicConfig.get(RETENTION_MS),
> topicConfig.get(CLEANUP_POLICY));
> createWindowStore(LocalStore.TWO_MINUTES_STORE, topicConfig, stream);
> createWindowStore(LocalStore.FIFTEEN_MINUTES_STORE, topicConfig, stream);
> createWindowStore(LocalStore.ONE_HOUR_STORE, topicConfig, stream);
> createWindowStore(LocalStore.TWELVE_HOURS_STORE, topicConfig, stream);
>   }
>   private void createWindowStore(
> LocalStore localStore,
> Map topicConfig,
> KStream stream) {
> // Configure how the statestore should be materialized using the provide 
> storeName
> Materialized> materialized 
> = Materialized
> .as(localStore.getStoreName());
> // Set retention of changelog topic
> materialized.withLoggingEnabled(topicConfig);
> // Configure how windows looks like and how long data will be retained in 
> local stores
> TimeWindows configuredTimeWindows = getConfiguredTimeWindows(
> localStore.getTimeUnit(), 
> Long.parseLong(topicConfig.get(RETENTION_MS)));
> // Processing description:
> // The input data are 'samples' with key 
> :::
> // 1. With the map we add the Tag to the key and we extract the error 
> score from the data
> // 2. With the groupByKey we group  the data on the new key
> // 3. With windowedBy we split up the data in time intervals depending on 
> the provided LocalStore enum
> // 4. With reduce we determine the maximum value in the time window
> // 5. Materialized will make it stored in a table
> stream
> .map(getInstallationAssetModelAlgorithmTagKeyMapper())
> .groupByKey()
> .windowedBy(configuredTimeWindows)
> .reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, 
> newValue), materialized);
>   }
>   private TimeWindows getConfiguredTimeWindows(long windowSizeMs, long 
> retentionMs) {
> TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
> timeWindows.until(retentionMs);
> return timeWindows;
>   }
>   /**
>* Determine the max error score to keep by looking at the aggregated error 
> signal and
>* freshly consumed error signal
>*
>* @param aggValue
>* @param newValue
>* @return
>*/
>   private ErrorScore getMaxErrorScore(ErrorScore aggValue, ErrorScore 
> newValue) {
> if(aggValue.getErrorSignal() > newValue.getErrorSignal()) {
> return aggValue;
> }
> return newValue;
>   }
>   private KeyValueMapper KeyValue> 
> getInstallationAssetModelAlgorithmTagKeyMapper() {
> return (s, sensorMeasurement) -> new KeyValue<>(s + "::" + 
> sensorMeasurement.getT(),
> new ErrorScore(sensorMeasurement.getTs(), 
> sensorMeasurement.getE(), sensorMeasurement.getO()));
>   }
> }
> {code}
> So we are materializing aggregated data to four different stores after 
> determining the max value within a specific window for a specific key. Please 
> note that retention which is set to two months of data and the clean up 
> policy delete. We don't compact data.
> The size of the individual state stores on disk is between 14 to 20 gb of 
> data.
> We are making use of Interactive Queries: 
> 

[jira] [Created] (KAFKA-8153) Streaming application with state stores takes up to 1 hour to restart

2019-03-25 Thread Michael Melsen (JIRA)
Michael Melsen created KAFKA-8153:
-

 Summary: Streaming application with state stores takes up to 1 
hour to restart
 Key: KAFKA-8153
 URL: https://issues.apache.org/jira/browse/KAFKA-8153
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.1.1
Reporter: Michael Melsen


We are using spring cloud stream with Kafka streams 2.0.1 and utilizing the 
InteractiveQueryService to fetch data from the stores. There are 4 stores that 
persist data on disk after aggregating data. The code for the topology looks 
like this:
{code:java}
@Slf4j
@EnableBinding(SensorMeasurementBinding.class)
public class Consumer {

  public static final String RETENTION_MS = "retention.ms";
  public static final String CLEANUP_POLICY = "cleanup.policy";

  @Value("${windowstore.retention.ms}")
  private String retention;

/**
 * Process the data flowing in from a Kafka topic. Aggregate the data to:
 * - 2 minute
 * - 15 minutes
 * - one hour
 * - 12 hours
 *
 * @param stream
 */
@StreamListener(SensorMeasurementBinding.ERROR_SCORE_IN)
public void process(KStream stream) {

Map topicConfig = new HashMap<>();
topicConfig.put(RETENTION_MS, retention);
topicConfig.put(CLEANUP_POLICY, "delete");

log.info("Changelog and local window store retention.ms: {} and 
cleanup.policy: {}",
topicConfig.get(RETENTION_MS),
topicConfig.get(CLEANUP_POLICY));

createWindowStore(LocalStore.TWO_MINUTES_STORE, topicConfig, stream);
createWindowStore(LocalStore.FIFTEEN_MINUTES_STORE, topicConfig, stream);
createWindowStore(LocalStore.ONE_HOUR_STORE, topicConfig, stream);
createWindowStore(LocalStore.TWELVE_HOURS_STORE, topicConfig, stream);
  }

  private void createWindowStore(
LocalStore localStore,
Map topicConfig,
KStream stream) {

// Configure how the statestore should be materialized using the provide 
storeName
Materialized> materialized = 
Materialized
.as(localStore.getStoreName());

// Set retention of changelog topic
materialized.withLoggingEnabled(topicConfig);

// Configure how windows looks like and how long data will be retained in 
local stores
TimeWindows configuredTimeWindows = getConfiguredTimeWindows(
localStore.getTimeUnit(), 
Long.parseLong(topicConfig.get(RETENTION_MS)));

// Processing description:
// The input data are 'samples' with key 
:::
// 1. With the map we add the Tag to the key and we extract the error score 
from the data
// 2. With the groupByKey we group  the data on the new key
// 3. With windowedBy we split up the data in time intervals depending on 
the provided LocalStore enum
// 4. With reduce we determine the maximum value in the time window
// 5. Materialized will make it stored in a table
stream
.map(getInstallationAssetModelAlgorithmTagKeyMapper())
.groupByKey()
.windowedBy(configuredTimeWindows)
.reduce((aggValue, newValue) -> getMaxErrorScore(aggValue, 
newValue), materialized);
  }

  private TimeWindows getConfiguredTimeWindows(long windowSizeMs, long 
retentionMs) {
TimeWindows timeWindows = TimeWindows.of(windowSizeMs);
timeWindows.until(retentionMs);
return timeWindows;
  }

  /**
   * Determine the max error score to keep by looking at the aggregated error 
signal and
   * freshly consumed error signal
   *
   * @param aggValue
   * @param newValue
   * @return
   */
  private ErrorScore getMaxErrorScore(ErrorScore aggValue, ErrorScore newValue) 
{
if(aggValue.getErrorSignal() > newValue.getErrorSignal()) {
return aggValue;
}
return newValue;
  }

  private KeyValueMapper> 
getInstallationAssetModelAlgorithmTagKeyMapper() {
return (s, sensorMeasurement) -> new KeyValue<>(s + "::" + 
sensorMeasurement.getT(),
new ErrorScore(sensorMeasurement.getTs(), sensorMeasurement.getE(), 
sensorMeasurement.getO()));
  }
}
{code}
So we are materializing aggregated data to four different stores after 
determining the max value within a specific window for a specific key. Please 
note that retention which is set to two months of data and the clean up policy 
delete. We don't compact data.

The size of the individual state stores on disk is between 14 to 20 gb of data.

We are making use of Interactive Queries: 
[https://docs.confluent.io/current/streams/developer-guide/interactive-queries.html#interactive-queries]

On our setup we have 4 instances of our streaming app to be used as one 
consumer group. So every instance will store a specific part of all data in its 
store.

This all seems to work nicely. Until we restart one or more instances and wait 
for it to become available again. (Restart time only is about 3 minutes max). I 
would expect that the restart of the app would not take that long but 
unfortunately it