Thomas Hein created KAFKA-12735:
-----------------------------------
Summary: "Key Store is not initilalized" after some time for a
Keystore created by a transform Values processors
Key: KAFKA-12735
URL: https://issues.apache.org/jira/browse/KAFKA-12735
Project: Kafka
Issue Type: Bug
Components: clients
Affects Versions: 2.6.2
Reporter: Thomas Hein
Dear Kafka Fellows,
currently, we are facing problems with Kafka Streams.
We try to transform a set of messages into a state store.
The functionality is working, but after a certain period the application
returns the error
Key value store is not initialzed.
We tried alot of solutions, like using the Kafka events or loops to wait until
the store is available again. But the system is not able to healh again.
Colleagues of us use the Kubernetes Health check to restart the application
when this issues comes up. But we think this is not a proper solution.
What are you recommending?
Thanks a lot for your help
Our Code
{code:java}
@Cacheable(value = MYICP_NOTIFICATIONS, key = "#emailAddress", unless =
"#result == null || #result.cachedObject == null ||
#result.cachedObject.isEmpty()")
public GenericCacheable<List<MyIcpNotification>>
getMyIcpNotificationsForUser(final String uuid, final String emailAddress)
throws InterruptedException {
if (!hasText(emailAddress)) {
LOGGER.error("[{}]: getMyIcpNotificationsForUser was called with an
invalid email address.", uuid);
return new GenericCacheable<>(Collections.emptyList(), null);
}
if (keyValueStore == null) {
initializeStore(uuid);
}
if (keyValueStore == null) {
LOGGER.error("[{}]: Key value store is not initialized.", uuid);
return new GenericCacheable<>(Collections.emptyList(), null);
}
final List<Command<MyIcpPayload>> commandList =
keyValueStore.get(emailAddress);
if (commandList == null) {
return new GenericCacheable<>(Collections.emptyList(), null);
}
//@formatter:off
final List<MyIcpNotification> list = commandList
.stream()
.map(this::mapToNotification)
.collect(Collectors.toList());
//@formatter:on
return new GenericCacheable<>(list, LocalDateTime.now());
}
{code}
{code:java}
private void initializeStore(final String uuid) throws InterruptedException {
int counter = 0;
while (counter < 5) {
try {
keyValueStore = myIcpMessagesStream.store(storeName,
QueryableStoreTypes.keyValueStore());
return;
} catch (final Exception e) {
LOGGER.debug("[{}]: Error while loading the state store [{}]",
uuid, e.getMessage());
Thread.sleep(1000);
counter++;
}
}
}
{code}
{code:java}
public KafkaStreams myIcpMessagesStream(@Qualifier("myIcpEvents") final
StreamsBuilderFactoryBean streamsBuilderFactoryBean) throws Exception {
final StreamsBuilder myicpQueryStreamBuilder =
Objects.requireNonNull(streamsBuilderFactoryBean.getObject());
final StoreBuilder<KeyValueStore<String, List<Command<MyIcpPayload>>>>
keyValueStoreBuilder = keyValueStoreBuilder(inMemoryKeyValueStore(storeName),
Serdes.String(), new CommandListSerde<>());
myicpQueryStreamBuilder.addStateStore(keyValueStoreBuilder);
//@formatter:off
myicpQueryStreamBuilder
.stream(kafkaTopicNames.getMyIcpMessageTopic(),
Consumed.with(Serdes.String(), new CommandSerde<>()))
.mapValues(this::mapPayloadToMyIcpPayload)
.transformValues(() -> commandTransformer, storeName);
//@formatter:on
final KafkaStreams kafkaStreams = new
KafkaStreams(myicpQueryStreamBuilder.build(),
Objects.requireNonNull(streamsBuilderFactoryBean.getStreamsConfiguration()));
kafkaStreams.start();
return kafkaStreams;
}
{code}
{code:java}
public class CommandTransformer implements
ValueTransformer<Command<MyIcpPayload>, List<Command<MyIcpPayload>>> {
private static final Logger LOGGER =
LoggerFactory.getLogger(CommandTransformer.class);
@Value("${ifx.notificationService.myicp.storeName}")
private String storeName;
@Value("${ifx.notificationService.myicp.maxStoreSize}")
private int maxStoreSize;
private KeyValueStore<String, List<Command<MyIcpPayload>>> keyValueStore;
@Override
public void init(final ProcessorContext context) {
keyValueStore = (KeyValueStore<String, List<Command<MyIcpPayload>>>)
context.getStateStore(storeName);
}
@Override
@CacheEvict(value = MYICP_NOTIFICATIONS, key =
"#value.payload.user.emailAddress")
public List<Command<MyIcpPayload>> transform(final Command<MyIcpPayload>
value) {
if (value == null) {
return Collections.emptyList();
}
final List<Command<MyIcpPayload>> listForUser =
getCommandListForUser(value);
if (isInvalidValue(value, listForUser)) {
return listForUser;
}
if (listForUser.size() >= maxStoreSize) {
listForUser.remove(0);
}
LOGGER.debug("[{}] current list [{}]",
value.getPayload().getUser().getEmailAddress(), listForUser.size());
listForUser.add(value);
keyValueStore.put(value.getPayload().getUser().getEmailAddress(),
listForUser);
LOGGER.debug("[{}] list after update [{}]",
value.getPayload().getUser().getEmailAddress(), listForUser.size());
return listForUser;
}
private boolean isInvalidValue(final Command<MyIcpPayload> value, final
List<Command<MyIcpPayload>> listForUser) {
if (uuidAlreadyPresent(value, listForUser)) {
return true;
}
final ZonedDateTime oldestDate = getOldestDateInList(listForUser);
return nonNull(oldestDate) &&
oldestDate.isAfter(value.getHeader().getTimestamp());
}
private ZonedDateTime getOldestDateInList(final List<Command<MyIcpPayload>>
listForUser) {
//@formatter:off
return listForUser
.stream()
.map(myIcpPayloadCommand ->
myIcpPayloadCommand.getHeader().getTimestamp())
.max(ZonedDateTime::compareTo)
.orElse(null);
//@formatter:on
}
private boolean uuidAlreadyPresent(final Command<MyIcpPayload> value, final
List<Command<MyIcpPayload>> listForUser) {
return listForUser.stream().anyMatch(myIcpPayloadCommand ->
myIcpPayloadCommand.getHeader().getUuid().equalsIgnoreCase(value.getHeader().getUuid()));
}
private List<Command<MyIcpPayload>> getCommandListForUser(final
Command<MyIcpPayload> value) {
List<Command<MyIcpPayload>> listForUser =
keyValueStore.get(value.getPayload().getUser().getEmailAddress());
if (isNull(listForUser)) {
listForUser = new ArrayList<>();
}
return listForUser;
}
@Override
public void close() {
// do nothing here
}
}
{code}
--
This message was sent by Atlassian Jira
(v8.3.4#803005)