I just confirmed.

`KafkaConsumer.close()` should be idempotent. It's a bug in the consumer.

https://issues.apache.org/jira/browse/KAFKA-5169


-Matthias



On 5/3/17 2:20 PM, Matthias J. Sax wrote:
> Yes, Streams might call "close" multiple times, as we assume it's an
> idempotent operations.
> 
> Thus, if you close "your" Consumer in `DebugTransformer.close()`, the
> operation is not idempotent anymore, and thus fails.
> `KafkaConsumer.close()` is not idempotent :(
> 
> You can just use a "try-catch" when you call `Consumer.close()` and
> swallow the exception. This should fix the issue.
> 
> If you have only one Streams instance, this does not hit, at we don't
> call "close" twice.
> 
> 
> -Matthias
> 
> 
> On 5/3/17 9:34 AM, Andreas Voss wrote:
>> Hi again,
>>
>> One more observation: The problem only occurs when the two application 
>> instances are started one after the other with some delay in-between (7 
>> seconds in my test setup). So the first instance already started to process 
>> the events that were in the queue, when the second instance came up. Looks 
>> like the second instance tries to initiate a rebalance that fails.
>>
>> Thanks,
>> Andreas
>>
>>
>> -----Ursprüngliche Nachricht-----
>> Von: Narendra Kumar
>> Gesendet: Mittwoch, 3. Mai 2017 13:11
>> An: Andreas Voss <andreasv...@fico.com>; users@kafka.apache.org
>> Cc: Wladimir Schmidt <wladimirschm...@fico.com>; Christian Leiner 
>> <christianlei...@fico.com>
>> Betreff: RE: Consumer with another group.id conflicts with streams()
>>
>> Hi Matthias,
>>
>> I enabled logs in the application and this is what I have observed -->
>>
>> While rebalancing ProcessorNode.close() is getting  called twice, once from 
>> StreamThread.suspendTasksAndState() and once from 
>> StreamThread.closeNonAssignedSuspendedTasks(). DebugTranDebugTransformer has 
>> instance field of type KafkaConsumer to do some lookup. 
>> DebugTransformer.close() throws  exception when called second time (i.e. 
>> IllegalStateException from  KafkaConsumer instance because it is already 
>> closed.), we fail to close the task's state manager ( i.e. call to 
>> task.closeStateManager(true); fails). StateManager is still holding lock to 
>> the task's state directory. After rebalance, if the same task id is launched 
>> on same application instance but in different thread then the task get stuck 
>> because it fails to get lock to the task's state directory. The task get 
>> stuck forever and can't process any data from here. Depending on how much 
>> data was there on the partition at the time of rebalance we see different 
>> number of alerts.
>>
>> I have attached the log file for reference.
>>
>> Thanks,
>> Narendra
>>
>>
>> -----Original Message-----
>> From: Andreas Voss
>> Sent: Tuesday, April 25, 2017 1:27 PM
>> To: users@kafka.apache.org
>> Cc: Narendra Kumar <narendraku...@fico.com>; Wladimir Schmidt 
>> <wladimirschm...@fico.com>; Christian Leiner <christianlei...@fico.com>
>> Subject: Re: Consumer with another group.id conflicts with streams()
>>
>> Hi Matthias,
>>
>> thank you for your response. Here are some more details:
>>
>> - my input and output topics have 6 partitions each.
>> - my application instances run from docker images so the is no state left 
>> over from a previous run
>> - I have a single docker container running kafka + zookeeper (spotify/kafka).
>> - when the second consumer is in place, I receive a random number of records 
>> in the target topic (e.g. I send 1000 records and receive 439 on first run 
>> and 543 on second run)
>> - the problem only occurs if two instances of the application are running. 
>> If I only start one instance then it's slow but when I send 1000 records I 
>> also receive 1000 records. I think this is also an indicator for a bug, 
>> because a streaming app should behave the same, independent of whether one 
>> or two instances are running.
>> - I added the properties you suggested, but behavior did not change.
>>
>> I think this is a bug, consumers of different groups should not interact 
>> with each other. Should I submit a bug report and if so, any suggestions on 
>> how to do that?
>>
>> Andreas
>>
>>
>> -----Ursprüngliche Nachricht-----
>> Von: Matthias J. Sax [mailto:matth...@confluent.io]
>> Gesendet: Montag, 24. April 2017 19:18
>> An: users@kafka.apache.org
>> Betreff: Re: Consumer with another group.id conflicts with streams()
>>
>> Hi,
>>
>> hard to diagnose. The new consumer should not affect the Streams app though 
>> -- even if I am wondering why you need it.
>>
>>> KafkaConsumer (with a UUID as group.id) that reads some historical
>>> data from input topic
>>
>> Maybe using GlobalKTable instead might be a better solution?
>>
>>> (i.e. I feed 1000 records into source topic and receive around 200 on
>>> the target topic)
>>
>> Are this the first 200 records? Or are those 200 record "random ones"
>> from your input topic? How many partitions do you have for input/output 
>> topic?
>>
>>> looks like a lot of rebalancing happens.
>>
>> We recommend to change StreamsConfig values as follows to improve in 
>> rebalance behavior:
>>
>>> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10
>>> from default of 0
>>> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
>>> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
>>> from default of 300 s
>>
>> We will change the default values accordingly in future release but for now 
>> you should set this manually.
>>
>>
>> Hope this helps.
>>
>>
>> -Matthias
>>
>> On 4/24/17 10:01 AM, Andreas Voss wrote:
>>> Hi, I have a simple streaming app that copies data from one topic to 
>>> another, so when I feed 1000 records into source topic I receive 1000 
>>> records in the target topic. Also the app contains a transform() step which 
>>> does nothing, except instantiating a KafkaConsumer (with a UUID as 
>>> group.id) that reads some historical data from input topic. As soon as this 
>>> consumer is in place, the streaming app does not work anymore, records get 
>>> lost (i.e. I feed 1000 records into source topic and receive around 200 on 
>>> the target topic) and it's terribly slow - looks like a lot of rebalancing 
>>> happens.
>>>
>>> To me this looks like a bug, because the KStreamBuilder uses the 
>>> application id as group.id ("kafka-smurfing" in this case), and the 
>>> transformer uses a different one (uuid).
>>>
>>> Full source code:
>>>
>>> public class Main {
>>>
>>>   public static final String BOOTSTRAP_SERVERS = "192.168.99.100:9092";
>>>   public static final String SOURCE_TOPIC = "transactions";
>>>   public static final String TARGET_TOPIC =  "alerts";
>>>
>>>   public static void main(String[] args) throws Exception {
>>>
>>>     KStreamBuilder builder = new KStreamBuilder();
>>>     builder.stream(Serdes.String(), Serdes.String(), SOURCE_TOPIC)
>>>            .transform(() -> new DebugTransformer())
>>>            .to(Serdes.String(), Serdes.String(), TARGET_TOPIC);
>>>
>>>     Properties props = new Properties();
>>>     props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-smurfing");
>>>     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
>>> Main.BOOTSTRAP_SERVERS);
>>>     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
>>> Serdes.String().getClass().getName());
>>>     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
>>> Serdes.String().getClass().getName());
>>>     props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
>>>     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>>
>>>     KafkaStreams streams = new KafkaStreams(builder, props);
>>>     streams.start();
>>>     Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
>>>
>>>   }
>>>
>>> }
>>>
>>> public class DebugTransformer implements Transformer<String, String,
>>> KeyValue<String, String>> {
>>>
>>>   private KafkaConsumer<String, String> consumer;
>>>   private ProcessorContext context;
>>>
>>>   @Override
>>>   public void init(ProcessorContext context) {
>>>     this.context = context;
>>>     Properties props = new Properties();
>>>     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
>>> Main.BOOTSTRAP_SERVERS);
>>>     props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
>>>     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
>>>     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
>>>     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
>>> StringDeserializer.class.getName());
>>>     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
>>> StringDeserializer.class.getName());
>>>     consumer = new KafkaConsumer<>(props);
>>>   }
>>>
>>>   @Override
>>>   public KeyValue<String, String> transform(String key, String value) {
>>>     TopicPartition partition = new TopicPartition(Main.SOURCE_TOPIC, 
>>> context.partition());
>>>     consumer.assign(Arrays.asList(partition));
>>>     consumer.seek(partition, 0);
>>>     consumer.poll(100);
>>>     return KeyValue.pair(key, value);
>>>   }
>>>
>>>   @Override
>>>   public void close() {
>>>     consumer.close();
>>>   }
>>>
>>>   @Override
>>>   public KeyValue<String, String> punctuate(long timestamp) {
>>>     return null;
>>>   }
>>>
>>> }
>>>
>>> Thanks for any hints,
>>> Andreas
>>>
>>> This email and any files transmitted with it are confidential, proprietary 
>>> and intended solely for the individual or entity to whom they are 
>>> addressed. If you have received this email in error please delete it 
>>> immediately.
>>>
>>
>> This email and any files transmitted with it are confidential, proprietary 
>> and intended solely for the individual or entity to whom they are addressed. 
>> If you have received this email in error please delete it immediately.
>>
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to