Thomas Hein created KAFKA-12735:
-----------------------------------

             Summary: "Key Store is not initilalized" after some time for a 
Keystore created by a transform Values processors
                 Key: KAFKA-12735
                 URL: https://issues.apache.org/jira/browse/KAFKA-12735
             Project: Kafka
          Issue Type: Bug
          Components: clients
    Affects Versions: 2.6.2
            Reporter: Thomas Hein


Dear Kafka Fellows,

currently, we are facing problems with Kafka Streams.
We try to transform a set of messages into a state store.

The functionality is working, but after a certain period the application 
returns the error
Key value store is not initialzed.

We tried alot of solutions, like using the Kafka events or loops to wait until 
the store is available again. But the system is not able to healh again.

Colleagues of us use the Kubernetes Health check to restart the application 
when this issues comes up. But we think this is not a proper solution.

What are you recommending?

 

 

Thanks a lot for your help

 

Our Code

 
{code:java}
@Cacheable(value = MYICP_NOTIFICATIONS, key = "#emailAddress", unless = 
"#result == null || #result.cachedObject == null || 
#result.cachedObject.isEmpty()")
public GenericCacheable<List<MyIcpNotification>> 
getMyIcpNotificationsForUser(final String uuid, final String emailAddress) 
throws InterruptedException {
    if (!hasText(emailAddress)) {
        LOGGER.error("[{}]: getMyIcpNotificationsForUser was called with an 
invalid email address.", uuid);
        return new GenericCacheable<>(Collections.emptyList(), null);
    }

    if (keyValueStore == null) {
        initializeStore(uuid);
    }

    if (keyValueStore == null) {
        LOGGER.error("[{}]: Key value store is not initialized.", uuid);
        return new GenericCacheable<>(Collections.emptyList(), null);
    }

    final List<Command<MyIcpPayload>> commandList = 
keyValueStore.get(emailAddress);
    if (commandList == null) {
        return new GenericCacheable<>(Collections.emptyList(), null);
    }

    //@formatter:off
    final List<MyIcpNotification> list = commandList
            .stream()
            .map(this::mapToNotification)
            .collect(Collectors.toList());
    //@formatter:on

    return new GenericCacheable<>(list, LocalDateTime.now());
}
{code}
{code:java}
private void initializeStore(final String uuid) throws InterruptedException {
    int counter = 0;
    while (counter < 5) {
        try {
            keyValueStore = myIcpMessagesStream.store(storeName, 
QueryableStoreTypes.keyValueStore());
            return;
        } catch (final Exception e) {
            LOGGER.debug("[{}]: Error while loading the state store [{}]", 
uuid, e.getMessage());
            Thread.sleep(1000);
            counter++;
        }
    }
}
{code}
 
{code:java}
public KafkaStreams myIcpMessagesStream(@Qualifier("myIcpEvents") final 
StreamsBuilderFactoryBean streamsBuilderFactoryBean) throws Exception {
    final StreamsBuilder myicpQueryStreamBuilder = 
Objects.requireNonNull(streamsBuilderFactoryBean.getObject());

    final StoreBuilder<KeyValueStore<String, List<Command<MyIcpPayload>>>> 
keyValueStoreBuilder = keyValueStoreBuilder(inMemoryKeyValueStore(storeName), 
Serdes.String(), new CommandListSerde<>());
    myicpQueryStreamBuilder.addStateStore(keyValueStoreBuilder);

    //@formatter:off
    myicpQueryStreamBuilder
            .stream(kafkaTopicNames.getMyIcpMessageTopic(), 
Consumed.with(Serdes.String(), new CommandSerde<>()))
            .mapValues(this::mapPayloadToMyIcpPayload)
            .transformValues(() -> commandTransformer, storeName);
    //@formatter:on

    final KafkaStreams kafkaStreams = new 
KafkaStreams(myicpQueryStreamBuilder.build(), 
Objects.requireNonNull(streamsBuilderFactoryBean.getStreamsConfiguration()));
    kafkaStreams.start();

    return kafkaStreams;
}
{code}
 

 
{code:java}
public class CommandTransformer implements 
ValueTransformer<Command<MyIcpPayload>, List<Command<MyIcpPayload>>> {
    private static final Logger LOGGER = 
LoggerFactory.getLogger(CommandTransformer.class);

    @Value("${ifx.notificationService.myicp.storeName}")
    private String storeName;

    @Value("${ifx.notificationService.myicp.maxStoreSize}")
    private int maxStoreSize;

    private KeyValueStore<String, List<Command<MyIcpPayload>>> keyValueStore;

    @Override
    public void init(final ProcessorContext context) {
        keyValueStore = (KeyValueStore<String, List<Command<MyIcpPayload>>>) 
context.getStateStore(storeName);
    }

    @Override
    @CacheEvict(value = MYICP_NOTIFICATIONS, key = 
"#value.payload.user.emailAddress")
    public List<Command<MyIcpPayload>> transform(final Command<MyIcpPayload> 
value) {
        if (value == null) {
            return Collections.emptyList();
        }

        final List<Command<MyIcpPayload>> listForUser = 
getCommandListForUser(value);

        if (isInvalidValue(value, listForUser)) {
            return listForUser;
        }

        if (listForUser.size() >= maxStoreSize) {
            listForUser.remove(0);
        }

        LOGGER.debug("[{}] current list [{}]", 
value.getPayload().getUser().getEmailAddress(), listForUser.size());

        listForUser.add(value);
        keyValueStore.put(value.getPayload().getUser().getEmailAddress(), 
listForUser);

        LOGGER.debug("[{}] list after update [{}]", 
value.getPayload().getUser().getEmailAddress(), listForUser.size());

        return listForUser;
    }

    private boolean isInvalidValue(final Command<MyIcpPayload> value, final 
List<Command<MyIcpPayload>> listForUser) {
        if (uuidAlreadyPresent(value, listForUser)) {
            return true;
        }

        final ZonedDateTime oldestDate = getOldestDateInList(listForUser);

        return nonNull(oldestDate) && 
oldestDate.isAfter(value.getHeader().getTimestamp());
    }

    private ZonedDateTime getOldestDateInList(final List<Command<MyIcpPayload>> 
listForUser) {
        //@formatter:off
        return listForUser
                .stream()
                .map(myIcpPayloadCommand -> 
myIcpPayloadCommand.getHeader().getTimestamp())
                .max(ZonedDateTime::compareTo)
                .orElse(null);
        //@formatter:on
    }

    private boolean uuidAlreadyPresent(final Command<MyIcpPayload> value, final 
List<Command<MyIcpPayload>> listForUser) {
        return listForUser.stream().anyMatch(myIcpPayloadCommand -> 
myIcpPayloadCommand.getHeader().getUuid().equalsIgnoreCase(value.getHeader().getUuid()));
    }

    private List<Command<MyIcpPayload>> getCommandListForUser(final 
Command<MyIcpPayload> value) {
        List<Command<MyIcpPayload>> listForUser = 
keyValueStore.get(value.getPayload().getUser().getEmailAddress());
        if (isNull(listForUser)) {
            listForUser = new ArrayList<>();
        }

        return listForUser;
    }

    @Override
    public void close() {
        // do nothing here
    }
}
{code}
 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to