[
https://issues.apache.org/jira/browse/KAFKA-8228?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Boquan Tang resolved KAFKA-8228.
--------------------------------
Resolution: Duplicate
This might duplicate KAFKA-7866, close for now and watch that ticket.
> Exactly once semantics break during server restart for kafka-streams
> application
> --------------------------------------------------------------------------------
>
> Key: KAFKA-8228
> URL: https://issues.apache.org/jira/browse/KAFKA-8228
> Project: Kafka
> Issue Type: Bug
> Components: streams
> Affects Versions: 2.2.0
> Reporter: Boquan Tang
> Priority: Major
>
> We are using 2.2.0 for kafka-streams client and 2.0.1 for server.
> We have a simple kafka-streams application that has the following topology:
> {code:java}
> Source: KSTREAM-SOURCE-0000000004 (topics: [deduped-adclick])
> --> KSTREAM-TRANSFORM-0000000005
> Processor: KSTREAM-TRANSFORM-0000000005 (stores: [uid-offset-store])
> --> KSTREAM-TRANSFORM-0000000006
> <-- KSTREAM-SOURCE-0000000004
> Source: KSTREAM-SOURCE-0000000000 (topics: [advertiser-budget])
> --> KTABLE-SOURCE-0000000001
> Source: KSTREAM-SOURCE-0000000002 (topics: [advertisement-budget])
> --> KTABLE-SOURCE-0000000003
> Processor: KSTREAM-TRANSFORM-0000000006 (stores: [advertiser-budget-store,
> advertisement-budget-store])
> --> KSTREAM-SINK-0000000007
> <-- KSTREAM-TRANSFORM-0000000005
> Sink: KSTREAM-SINK-0000000007 (topic: budget-adclick)
> <-- KSTREAM-TRANSFORM-0000000006
> Processor: KTABLE-SOURCE-0000000001 (stores: [advertiser-budget-store])
> --> none
> <-- KSTREAM-SOURCE-0000000000
> Processor: KTABLE-SOURCE-0000000003 (stores: [advertisement-budget-store])
> --> none
> <-- KSTREAM-SOURCE-0000000002{code}
> The *Processor: KSTREAM-TRANSFORM-0000000005 (stores: [uid-offset-store])* is
> added additionally to investigate this EOS broken issue, and its transform()
> is like this (specific K V class name is removed):
> {code:java}
> public void init(final ProcessorContext context) {
> uidStore = (WindowStore<String, Long>)
> context.getStateStore(uidStoreName);
> this.context = context;
> }
> public KeyValue<K, V> transform(final K key, final V value) {
> final long offset = context.offset();
> final String uid = value.getUid();
> final long beginningOfHour =
> Instant.ofEpochMilli(clickTimestamp).atZone(ZoneId.systemDefault()).withMinute(0).withSecond(0).toEpochSecond()
> * 1000;
> final Long maybeLastOffset = uidStore.fetch(uid, beginningOfHour);
> final boolean dupe = null != maybeLastOffset && offset == maybeLastOffset;
> uidStore.put(uid, offset, beginningOfHour);
> if (dupe) {
> LOGGER.warn("Find duplication in partition {}, uid is {}, current
> offset is {}, last offset is {}",
> context.partition(),
> uid,
> offset,
> maybeLastOffset);
> statsEmitter.count("duplication");
> }
> return dupe ? null : new KeyValue<>(key, value);
> }
> {code}
> Although not 100% reproduce-able, we found that after we restart one or more
> server on the cluster side, the duplication would be found:
> {code:java}
> 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient]
> [kafka-producer-network-thread |
> adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer]
> [Producer
> clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer,
> transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2
> (*:9092) could not be established. Broker may not be available.
> 2019-04-12T07:12:58Z WARN [org.apache.kafka.clients.NetworkClient]
> [kafka-producer-network-thread |
> adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer]
> [Producer
> clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_9-producer,
> transactionalId=adclick-budget-decorator-streams-0_9] Connection to node 2
> (*:9092) could not be established. Broker may not be available.
> 2019-04-12T07:14:02Z WARN [org.apache.kafka.clients.NetworkClient]
> [kafka-producer-network-thread |
> adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer]
> [Producer
> clientId=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_12-producer,
> transactionalId=adclick-budget-decorator-streams-0_12] Connection to node 2
> (*:9092) could not be established. Broker may not be available.
> 2019-04-12T07:27:39Z WARN
> [org.apache.kafka.streams.processor.internals.StreamThread]
> [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1]
> stream-thread
> [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1]
> Detected task 0_9 that got migrated to another thread. This implies that
> this thread missed a rebalance and dropped out of the consumer group. Will
> try to rejoin the consumer group. Below is the detailed description of the
> task: >TaskId: 0_9 >> ProcessorTopology: > KSTREAM-SOURCE-0000000000: >
> topics: [advertiser-budget] > children: [KTABLE-SOURCE-0000000001] >
> KTABLE-SOURCE-0000000001: > states: [advertiser-budget-store] >
> KSTREAM-SOURCE-0000000004: > topics: [deduped-adclick] > children:
> [KSTREAM-TRANSFORM-0000000005] > KSTREAM-TRANSFORM-0000000005: > states:
> [uid-offset-store] > children: [KSTREAM-TRANSFORM-0000000006] >
> KSTREAM-TRANSFORM-0000000006: > states: [advertiser-budget-store,
> advertisement-budget-store] > children: [KSTREAM-SINK-0000000007] >
> KSTREAM-SINK-0000000007: > topic: StaticTopicNameExtractor(budget-adclick) >
> KSTREAM-SOURCE-0000000002: > topics: [advertisement-budget] > children:
> [KTABLE-SOURCE-0000000003] > KTABLE-SOURCE-0000000003: > states:
> [advertisement-budget-store] >Partitions [advertiser-budget-9,
> deduped-adclick-9, advertisement-budget-9]
> 2019-04-12T07:27:40Z WARN [org.apache.kafka.common.utils.AppInfoParser]
> [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1]
> Error registering AppInfo mbean
> javax.management.InstanceAlreadyExistsException
> javax.management.InstanceAlreadyExistsException:
> kafka.producer:type=app-info,id=adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1-0_18-producer
> at com.sun.jmx.mbeanserver.Repository.addMBean(Repository.java:437) at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerWithRepository(DefaultMBeanServerInterceptor.java:1898)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerDynamicMBean(DefaultMBeanServerInterceptor.java:966)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerObject(DefaultMBeanServerInterceptor.java:900)
> at
> com.sun.jmx.interceptor.DefaultMBeanServerInterceptor.registerMBean(DefaultMBeanServerInterceptor.java:324)
> at
> com.sun.jmx.mbeanserver.JmxMBeanServer.registerMBean(JmxMBeanServer.java:522)
> at
> org.apache.kafka.common.utils.AppInfoParser.registerAppInfo(AppInfoParser.java:62)
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:424)
> at
> org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:287)
> at
> org.apache.kafka.streams.processor.internals.DefaultKafkaClientSupplier.getProducer(DefaultKafkaClientSupplier.java:39)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createProducer(StreamThread.java:457)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.lambda$createTask$0(StreamThread.java:447)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:192)
> at
> org.apache.kafka.streams.processor.internals.StreamTask.<init>(StreamTask.java:172)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:448)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:399)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:384)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:148)
> at
> org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:107)
> at
> org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:281)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:292)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:410)
> at
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:344)
> at
> org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:342)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1226)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1191)
> at
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1176)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:941)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:846)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:805)
> at
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:774)
> 2019-04-12T07:30:28Z WARN
> [com.indeed.adclick.budget.decorator.streams.transformers.UidDedupeTransformer]
>
> [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1]
> Find duplication in partition 18, uid is 1d8868ce40umu002, current offset is
> 212770034, last offset is 212770034
> 2019-04-12T07:30:28Z WARN
> [com.indeed.adclick.budget.decorator.streams.transformers.UidDedupeTransformer]
>
> [adclick-budget-decorator-streams-e70f7538-4125-4e8f-aeee-0c8717d663bb-StreamThread-1]
> Find duplication in partition 18, uid is 1d8868du40u1u001, current offset is
> 212770036, last offset is 212770036{code}
> And our kafka-streams are configured simply like this:
> {code:java}
> streamsConfiguration.put(StreamsConfig.APPLICATION_ID_CONFIG,
> kafkaStreamsApplicationId);
> streamsConfiguration.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,
> bootstrapServers);
> streamsConfiguration.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> streamsConfiguration.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
> Serdes.String().getClass().getName());
> streamsConfiguration.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG,
> EXACTLY_ONCE);
> streamsConfiguration.put(StreamsConfig.REPLICATION_FACTOR_CONFIG, 3);
> streamsConfiguration.put(StreamsConfig.producerPrefix(ProducerConfig.BATCH_SIZE_CONFIG),
> 1000);
> {code}
> In my understanding, the uid-offset-store in topology should be committed in
> the same transaction with that of the consumer offsets of deduped-adclick
> topic, so in theory the duplication check should not hit, am I understanding
> it correctly?
> I noticed one of the unusual issue on the server side is that the group
> coordinator needs long time to initiate when server restarts, that caused the
> long halt between node loss and eventual task migration.
> Please let me know if I need to look into certain server side logs, I can
> share any finding with you, or even perform more of destroy test in order to
> re-produce the issue so we can investigate.
> Update: we did some more test, and observed that the rollback / EOS break
> issue only happens when the lead node (node 2 in this case) of the
> __consumer_offsets partition corresponding to this particular kafka streams
> is restarted.
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)