Yes, Streams might call "close" multiple times, as we assume it's an idempotent operations.
Thus, if you close "your" Consumer in `DebugTransformer.close()`, the operation is not idempotent anymore, and thus fails. `KafkaConsumer.close()` is not idempotent :( You can just use a "try-catch" when you call `Consumer.close()` and swallow the exception. This should fix the issue. If you have only one Streams instance, this does not hit, at we don't call "close" twice. -Matthias On 5/3/17 9:34 AM, Andreas Voss wrote: > Hi again, > > One more observation: The problem only occurs when the two application > instances are started one after the other with some delay in-between (7 > seconds in my test setup). So the first instance already started to process > the events that were in the queue, when the second instance came up. Looks > like the second instance tries to initiate a rebalance that fails. > > Thanks, > Andreas > > > -----Ursprüngliche Nachricht----- > Von: Narendra Kumar > Gesendet: Mittwoch, 3. Mai 2017 13:11 > An: Andreas Voss <andreasv...@fico.com>; users@kafka.apache.org > Cc: Wladimir Schmidt <wladimirschm...@fico.com>; Christian Leiner > <christianlei...@fico.com> > Betreff: RE: Consumer with another group.id conflicts with streams() > > Hi Matthias, > > I enabled logs in the application and this is what I have observed --> > > While rebalancing ProcessorNode.close() is getting called twice, once from > StreamThread.suspendTasksAndState() and once from > StreamThread.closeNonAssignedSuspendedTasks(). DebugTranDebugTransformer has > instance field of type KafkaConsumer to do some lookup. > DebugTransformer.close() throws exception when called second time (i.e. > IllegalStateException from KafkaConsumer instance because it is already > closed.), we fail to close the task's state manager ( i.e. call to > task.closeStateManager(true); fails). StateManager is still holding lock to > the task's state directory. After rebalance, if the same task id is launched > on same application instance but in different thread then the task get stuck > because it fails to get lock to the task's state directory. The task get > stuck forever and can't process any data from here. Depending on how much > data was there on the partition at the time of rebalance we see different > number of alerts. > > I have attached the log file for reference. > > Thanks, > Narendra > > > -----Original Message----- > From: Andreas Voss > Sent: Tuesday, April 25, 2017 1:27 PM > To: users@kafka.apache.org > Cc: Narendra Kumar <narendraku...@fico.com>; Wladimir Schmidt > <wladimirschm...@fico.com>; Christian Leiner <christianlei...@fico.com> > Subject: Re: Consumer with another group.id conflicts with streams() > > Hi Matthias, > > thank you for your response. Here are some more details: > > - my input and output topics have 6 partitions each. > - my application instances run from docker images so the is no state left > over from a previous run > - I have a single docker container running kafka + zookeeper (spotify/kafka). > - when the second consumer is in place, I receive a random number of records > in the target topic (e.g. I send 1000 records and receive 439 on first run > and 543 on second run) > - the problem only occurs if two instances of the application are running. If > I only start one instance then it's slow but when I send 1000 records I also > receive 1000 records. I think this is also an indicator for a bug, because a > streaming app should behave the same, independent of whether one or two > instances are running. > - I added the properties you suggested, but behavior did not change. > > I think this is a bug, consumers of different groups should not interact with > each other. Should I submit a bug report and if so, any suggestions on how to > do that? > > Andreas > > > -----Ursprüngliche Nachricht----- > Von: Matthias J. Sax [mailto:matth...@confluent.io] > Gesendet: Montag, 24. April 2017 19:18 > An: users@kafka.apache.org > Betreff: Re: Consumer with another group.id conflicts with streams() > > Hi, > > hard to diagnose. The new consumer should not affect the Streams app though > -- even if I am wondering why you need it. > >> KafkaConsumer (with a UUID as group.id) that reads some historical >> data from input topic > > Maybe using GlobalKTable instead might be a better solution? > >> (i.e. I feed 1000 records into source topic and receive around 200 on >> the target topic) > > Are this the first 200 records? Or are those 200 record "random ones" > from your input topic? How many partitions do you have for input/output topic? > >> looks like a lot of rebalancing happens. > > We recommend to change StreamsConfig values as follows to improve in > rebalance behavior: > >> props.put(ProducerConfig.RETRIES_CONFIG, 10); <---- increase to 10 >> from default of 0 >> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, >> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity >> from default of 300 s > > We will change the default values accordingly in future release but for now > you should set this manually. > > > Hope this helps. > > > -Matthias > > On 4/24/17 10:01 AM, Andreas Voss wrote: >> Hi, I have a simple streaming app that copies data from one topic to >> another, so when I feed 1000 records into source topic I receive 1000 >> records in the target topic. Also the app contains a transform() step which >> does nothing, except instantiating a KafkaConsumer (with a UUID as group.id) >> that reads some historical data from input topic. As soon as this consumer >> is in place, the streaming app does not work anymore, records get lost (i.e. >> I feed 1000 records into source topic and receive around 200 on the target >> topic) and it's terribly slow - looks like a lot of rebalancing happens. >> >> To me this looks like a bug, because the KStreamBuilder uses the application >> id as group.id ("kafka-smurfing" in this case), and the transformer uses a >> different one (uuid). >> >> Full source code: >> >> public class Main { >> >> public static final String BOOTSTRAP_SERVERS = "192.168.99.100:9092"; >> public static final String SOURCE_TOPIC = "transactions"; >> public static final String TARGET_TOPIC = "alerts"; >> >> public static void main(String[] args) throws Exception { >> >> KStreamBuilder builder = new KStreamBuilder(); >> builder.stream(Serdes.String(), Serdes.String(), SOURCE_TOPIC) >> .transform(() -> new DebugTransformer()) >> .to(Serdes.String(), Serdes.String(), TARGET_TOPIC); >> >> Properties props = new Properties(); >> props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-smurfing"); >> props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, >> Main.BOOTSTRAP_SERVERS); >> props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, >> Serdes.String().getClass().getName()); >> props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, >> Serdes.String().getClass().getName()); >> props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2); >> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); >> >> KafkaStreams streams = new KafkaStreams(builder, props); >> streams.start(); >> Runtime.getRuntime().addShutdownHook(new Thread(streams::close)); >> >> } >> >> } >> >> public class DebugTransformer implements Transformer<String, String, >> KeyValue<String, String>> { >> >> private KafkaConsumer<String, String> consumer; >> private ProcessorContext context; >> >> @Override >> public void init(ProcessorContext context) { >> this.context = context; >> Properties props = new Properties(); >> props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, >> Main.BOOTSTRAP_SERVERS); >> props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString()); >> props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none"); >> props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false"); >> props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, >> StringDeserializer.class.getName()); >> props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, >> StringDeserializer.class.getName()); >> consumer = new KafkaConsumer<>(props); >> } >> >> @Override >> public KeyValue<String, String> transform(String key, String value) { >> TopicPartition partition = new TopicPartition(Main.SOURCE_TOPIC, >> context.partition()); >> consumer.assign(Arrays.asList(partition)); >> consumer.seek(partition, 0); >> consumer.poll(100); >> return KeyValue.pair(key, value); >> } >> >> @Override >> public void close() { >> consumer.close(); >> } >> >> @Override >> public KeyValue<String, String> punctuate(long timestamp) { >> return null; >> } >> >> } >> >> Thanks for any hints, >> Andreas >> >> This email and any files transmitted with it are confidential, proprietary >> and intended solely for the individual or entity to whom they are addressed. >> If you have received this email in error please delete it immediately. >> > > This email and any files transmitted with it are confidential, proprietary > and intended solely for the individual or entity to whom they are addressed. > If you have received this email in error please delete it immediately. >
signature.asc
Description: OpenPGP digital signature