Yes, Streams might call "close" multiple times, as we assume it's an
idempotent operations.

Thus, if you close "your" Consumer in `DebugTransformer.close()`, the
operation is not idempotent anymore, and thus fails.
`KafkaConsumer.close()` is not idempotent :(

You can just use a "try-catch" when you call `Consumer.close()` and
swallow the exception. This should fix the issue.

If you have only one Streams instance, this does not hit, at we don't
call "close" twice.


-Matthias


On 5/3/17 9:34 AM, Andreas Voss wrote:
> Hi again,
> 
> One more observation: The problem only occurs when the two application 
> instances are started one after the other with some delay in-between (7 
> seconds in my test setup). So the first instance already started to process 
> the events that were in the queue, when the second instance came up. Looks 
> like the second instance tries to initiate a rebalance that fails.
> 
> Thanks,
> Andreas
> 
> 
> -----Ursprüngliche Nachricht-----
> Von: Narendra Kumar
> Gesendet: Mittwoch, 3. Mai 2017 13:11
> An: Andreas Voss <andreasv...@fico.com>; users@kafka.apache.org
> Cc: Wladimir Schmidt <wladimirschm...@fico.com>; Christian Leiner 
> <christianlei...@fico.com>
> Betreff: RE: Consumer with another group.id conflicts with streams()
> 
> Hi Matthias,
> 
> I enabled logs in the application and this is what I have observed -->
> 
> While rebalancing ProcessorNode.close() is getting  called twice, once from 
> StreamThread.suspendTasksAndState() and once from 
> StreamThread.closeNonAssignedSuspendedTasks(). DebugTranDebugTransformer has 
> instance field of type KafkaConsumer to do some lookup. 
> DebugTransformer.close() throws  exception when called second time (i.e. 
> IllegalStateException from  KafkaConsumer instance because it is already 
> closed.), we fail to close the task's state manager ( i.e. call to 
> task.closeStateManager(true); fails). StateManager is still holding lock to 
> the task's state directory. After rebalance, if the same task id is launched 
> on same application instance but in different thread then the task get stuck 
> because it fails to get lock to the task's state directory. The task get 
> stuck forever and can't process any data from here. Depending on how much 
> data was there on the partition at the time of rebalance we see different 
> number of alerts.
> 
> I have attached the log file for reference.
> 
> Thanks,
> Narendra
> 
> 
> -----Original Message-----
> From: Andreas Voss
> Sent: Tuesday, April 25, 2017 1:27 PM
> To: users@kafka.apache.org
> Cc: Narendra Kumar <narendraku...@fico.com>; Wladimir Schmidt 
> <wladimirschm...@fico.com>; Christian Leiner <christianlei...@fico.com>
> Subject: Re: Consumer with another group.id conflicts with streams()
> 
> Hi Matthias,
> 
> thank you for your response. Here are some more details:
> 
> - my input and output topics have 6 partitions each.
> - my application instances run from docker images so the is no state left 
> over from a previous run
> - I have a single docker container running kafka + zookeeper (spotify/kafka).
> - when the second consumer is in place, I receive a random number of records 
> in the target topic (e.g. I send 1000 records and receive 439 on first run 
> and 543 on second run)
> - the problem only occurs if two instances of the application are running. If 
> I only start one instance then it's slow but when I send 1000 records I also 
> receive 1000 records. I think this is also an indicator for a bug, because a 
> streaming app should behave the same, independent of whether one or two 
> instances are running.
> - I added the properties you suggested, but behavior did not change.
> 
> I think this is a bug, consumers of different groups should not interact with 
> each other. Should I submit a bug report and if so, any suggestions on how to 
> do that?
> 
> Andreas
> 
> 
> -----Ursprüngliche Nachricht-----
> Von: Matthias J. Sax [mailto:matth...@confluent.io]
> Gesendet: Montag, 24. April 2017 19:18
> An: users@kafka.apache.org
> Betreff: Re: Consumer with another group.id conflicts with streams()
> 
> Hi,
> 
> hard to diagnose. The new consumer should not affect the Streams app though 
> -- even if I am wondering why you need it.
> 
>> KafkaConsumer (with a UUID as group.id) that reads some historical
>> data from input topic
> 
> Maybe using GlobalKTable instead might be a better solution?
> 
>> (i.e. I feed 1000 records into source topic and receive around 200 on
>> the target topic)
> 
> Are this the first 200 records? Or are those 200 record "random ones"
> from your input topic? How many partitions do you have for input/output topic?
> 
>> looks like a lot of rebalancing happens.
> 
> We recommend to change StreamsConfig values as follows to improve in 
> rebalance behavior:
> 
>> props.put(ProducerConfig.RETRIES_CONFIG, 10);  <---- increase to 10
>> from default of 0
>> props.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG,
>> Integer.toString(Integer.MAX_VALUE)); <--------- increase to infinity
>> from default of 300 s
> 
> We will change the default values accordingly in future release but for now 
> you should set this manually.
> 
> 
> Hope this helps.
> 
> 
> -Matthias
> 
> On 4/24/17 10:01 AM, Andreas Voss wrote:
>> Hi, I have a simple streaming app that copies data from one topic to 
>> another, so when I feed 1000 records into source topic I receive 1000 
>> records in the target topic. Also the app contains a transform() step which 
>> does nothing, except instantiating a KafkaConsumer (with a UUID as group.id) 
>> that reads some historical data from input topic. As soon as this consumer 
>> is in place, the streaming app does not work anymore, records get lost (i.e. 
>> I feed 1000 records into source topic and receive around 200 on the target 
>> topic) and it's terribly slow - looks like a lot of rebalancing happens.
>>
>> To me this looks like a bug, because the KStreamBuilder uses the application 
>> id as group.id ("kafka-smurfing" in this case), and the transformer uses a 
>> different one (uuid).
>>
>> Full source code:
>>
>> public class Main {
>>
>>   public static final String BOOTSTRAP_SERVERS = "192.168.99.100:9092";
>>   public static final String SOURCE_TOPIC = "transactions";
>>   public static final String TARGET_TOPIC =  "alerts";
>>
>>   public static void main(String[] args) throws Exception {
>>
>>     KStreamBuilder builder = new KStreamBuilder();
>>     builder.stream(Serdes.String(), Serdes.String(), SOURCE_TOPIC)
>>            .transform(() -> new DebugTransformer())
>>            .to(Serdes.String(), Serdes.String(), TARGET_TOPIC);
>>
>>     Properties props = new Properties();
>>     props.put(StreamsConfig.APPLICATION_ID_CONFIG, "kafka-smurfing");
>>     props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, 
>> Main.BOOTSTRAP_SERVERS);
>>     props.put(StreamsConfig.KEY_SERDE_CLASS_CONFIG, 
>> Serdes.String().getClass().getName());
>>     props.put(StreamsConfig.VALUE_SERDE_CLASS_CONFIG, 
>> Serdes.String().getClass().getName());
>>     props.put(StreamsConfig.NUM_STREAM_THREADS_CONFIG, 2);
>>     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
>>
>>     KafkaStreams streams = new KafkaStreams(builder, props);
>>     streams.start();
>>     Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
>>
>>   }
>>
>> }
>>
>> public class DebugTransformer implements Transformer<String, String,
>> KeyValue<String, String>> {
>>
>>   private KafkaConsumer<String, String> consumer;
>>   private ProcessorContext context;
>>
>>   @Override
>>   public void init(ProcessorContext context) {
>>     this.context = context;
>>     Properties props = new Properties();
>>     props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
>> Main.BOOTSTRAP_SERVERS);
>>     props.put(ConsumerConfig.GROUP_ID_CONFIG, UUID.randomUUID().toString());
>>     props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none");
>>     props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
>>     props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
>> StringDeserializer.class.getName());
>>     props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
>> StringDeserializer.class.getName());
>>     consumer = new KafkaConsumer<>(props);
>>   }
>>
>>   @Override
>>   public KeyValue<String, String> transform(String key, String value) {
>>     TopicPartition partition = new TopicPartition(Main.SOURCE_TOPIC, 
>> context.partition());
>>     consumer.assign(Arrays.asList(partition));
>>     consumer.seek(partition, 0);
>>     consumer.poll(100);
>>     return KeyValue.pair(key, value);
>>   }
>>
>>   @Override
>>   public void close() {
>>     consumer.close();
>>   }
>>
>>   @Override
>>   public KeyValue<String, String> punctuate(long timestamp) {
>>     return null;
>>   }
>>
>> }
>>
>> Thanks for any hints,
>> Andreas
>>
>> This email and any files transmitted with it are confidential, proprietary 
>> and intended solely for the individual or entity to whom they are addressed. 
>> If you have received this email in error please delete it immediately.
>>
> 
> This email and any files transmitted with it are confidential, proprietary 
> and intended solely for the individual or entity to whom they are addressed. 
> If you have received this email in error please delete it immediately.
> 

Attachment: signature.asc
Description: OpenPGP digital signature

Reply via email to