[jira] [Commented] (KAFKA-7628) KafkaStream is not closing
[ https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782143#comment-16782143 ] Guozhang Wang commented on KAFKA-7628: -- It was not a bug, I should mark it as "not a problem." > KafkaStream is not closing > -- > > Key: KAFKA-7628 > URL: https://issues.apache.org/jira/browse/KAFKA-7628 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 > Environment: Macbook Pro >Reporter: Ozgur >Priority: Major > > I'm closing a KafkaStream when I need based on a certain condition: > Closing: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream already closed?"); > } else { > boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS); > if(closed) { > kafkaStream = null; > logger.info("KafkaStream closed"); > } else { > logger.info("KafkaStream could not closed"); > } > } > {code} > Starting: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream is starting"); > kafkaStream = > KafkaManager.getInstance().getStream(this.getConfigFilePath(), > this, > this.getTopic() > ); > kafkaStream.start(); > logger.info("KafkaStream is started"); > } > {code} > > > In my implementation of Processor, {{process(String key, byte[] value)}} is > still called although successfully closing stream: > > {code:java} > // code placeholder > public abstract class BaseKafkaProcessor implements Processor > { > private static Logger logger = > LogManager.getLogger(BaseKafkaProcessor.class); > private ProcessorContext context; > private ProcessorContext getContext() { > return context; > } > @Override > public void init(ProcessorContext context) { > this.context = context; > this.context.schedule(1000); > } > @Override > public void process(String key, byte[] value) { > try { > String topic = key.split("-")[0]; > byte[] uncompressed = GzipCompressionUtil.uncompress(value); > String json = new String(uncompressed, "UTF-8"); > processRecord(topic, json); > this.getContext().commit(); > } catch (Exception e) { > logger.error("Error processing json", e); > } > } > protected abstract void processRecord(String topic, String json); > @Override > public void punctuate(long timestamp) { > this.getContext().commit(); > } > @Override > public void close() { > this.getContext().commit(); > } > } > {code} > > My configuration for KafkaStreams: > > {code:java} > application.id=dv_ws_in_app_activity_dev4 > bootstrap.servers=VLXH1 > auto.offset.reset=latest > num.stream.threads=1 > key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde > value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde > poll.ms = 100 > commit.interval.ms=1000 > state.dir=../../temp/kafka-state-dir > {code} > Version: *0.11.0.1* > > I'm witnessing that after closing() the streams, these ports are still > listening: > > {code:java} > $ sudo lsof -i -n -P | grep 9092 > java 29457 ozgur 133u IPv6 0x531e550533f38283 0t0 TCP > x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED) > java 29457 ozgur 134u IPv6 0x531e55051a789ec3 0t0 TCP > x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED) > java 29457 ozgur 135u IPv6 0x531e55051a789903 0t0 TCP > x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 136u IPv6 0x531e55051a78aa43 0t0 TCP > x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 140u IPv6 0x531e55051a78c703 0t0 TCP > x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 141u IPv6 0x531e55051a78a483 0t0 TCP > x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED) > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7628) KafkaStream is not closing
[ https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16782124#comment-16782124 ] Matthias J. Sax commented on KAFKA-7628: [~guozhang] What is the fixed version for this? Is this ticket a duplicate? > KafkaStream is not closing > -- > > Key: KAFKA-7628 > URL: https://issues.apache.org/jira/browse/KAFKA-7628 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 > Environment: Macbook Pro >Reporter: Ozgur >Priority: Major > > I'm closing a KafkaStream when I need based on a certain condition: > Closing: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream already closed?"); > } else { > boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS); > if(closed) { > kafkaStream = null; > logger.info("KafkaStream closed"); > } else { > logger.info("KafkaStream could not closed"); > } > } > {code} > Starting: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream is starting"); > kafkaStream = > KafkaManager.getInstance().getStream(this.getConfigFilePath(), > this, > this.getTopic() > ); > kafkaStream.start(); > logger.info("KafkaStream is started"); > } > {code} > > > In my implementation of Processor, {{process(String key, byte[] value)}} is > still called although successfully closing stream: > > {code:java} > // code placeholder > public abstract class BaseKafkaProcessor implements Processor > { > private static Logger logger = > LogManager.getLogger(BaseKafkaProcessor.class); > private ProcessorContext context; > private ProcessorContext getContext() { > return context; > } > @Override > public void init(ProcessorContext context) { > this.context = context; > this.context.schedule(1000); > } > @Override > public void process(String key, byte[] value) { > try { > String topic = key.split("-")[0]; > byte[] uncompressed = GzipCompressionUtil.uncompress(value); > String json = new String(uncompressed, "UTF-8"); > processRecord(topic, json); > this.getContext().commit(); > } catch (Exception e) { > logger.error("Error processing json", e); > } > } > protected abstract void processRecord(String topic, String json); > @Override > public void punctuate(long timestamp) { > this.getContext().commit(); > } > @Override > public void close() { > this.getContext().commit(); > } > } > {code} > > My configuration for KafkaStreams: > > {code:java} > application.id=dv_ws_in_app_activity_dev4 > bootstrap.servers=VLXH1 > auto.offset.reset=latest > num.stream.threads=1 > key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde > value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde > poll.ms = 100 > commit.interval.ms=1000 > state.dir=../../temp/kafka-state-dir > {code} > Version: *0.11.0.1* > > I'm witnessing that after closing() the streams, these ports are still > listening: > > {code:java} > $ sudo lsof -i -n -P | grep 9092 > java 29457 ozgur 133u IPv6 0x531e550533f38283 0t0 TCP > x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED) > java 29457 ozgur 134u IPv6 0x531e55051a789ec3 0t0 TCP > x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED) > java 29457 ozgur 135u IPv6 0x531e55051a789903 0t0 TCP > x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 136u IPv6 0x531e55051a78aa43 0t0 TCP > x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 140u IPv6 0x531e55051a78c703 0t0 TCP > x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 141u IPv6 0x531e55051a78a483 0t0 TCP > x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED) > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7628) KafkaStream is not closing
[ https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690710#comment-16690710 ] Guozhang Wang commented on KAFKA-7628: -- I see :) You mentioned that after upgrading to version (2.0.1) you still observe the same, and I thought you meant that the state can still transit to `NOT_RUNNING` while threads are not joined yet. Thanks for confirming! > KafkaStream is not closing > -- > > Key: KAFKA-7628 > URL: https://issues.apache.org/jira/browse/KAFKA-7628 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 > Environment: Macbook Pro >Reporter: Ozgur >Priority: Major > > I'm closing a KafkaStream when I need based on a certain condition: > Closing: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream already closed?"); > } else { > boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS); > if(closed) { > kafkaStream = null; > logger.info("KafkaStream closed"); > } else { > logger.info("KafkaStream could not closed"); > } > } > {code} > Starting: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream is starting"); > kafkaStream = > KafkaManager.getInstance().getStream(this.getConfigFilePath(), > this, > this.getTopic() > ); > kafkaStream.start(); > logger.info("KafkaStream is started"); > } > {code} > > > In my implementation of Processor, {{process(String key, byte[] value)}} is > still called although successfully closing stream: > > {code:java} > // code placeholder > public abstract class BaseKafkaProcessor implements Processor > { > private static Logger logger = > LogManager.getLogger(BaseKafkaProcessor.class); > private ProcessorContext context; > private ProcessorContext getContext() { > return context; > } > @Override > public void init(ProcessorContext context) { > this.context = context; > this.context.schedule(1000); > } > @Override > public void process(String key, byte[] value) { > try { > String topic = key.split("-")[0]; > byte[] uncompressed = GzipCompressionUtil.uncompress(value); > String json = new String(uncompressed, "UTF-8"); > processRecord(topic, json); > this.getContext().commit(); > } catch (Exception e) { > logger.error("Error processing json", e); > } > } > protected abstract void processRecord(String topic, String json); > @Override > public void punctuate(long timestamp) { > this.getContext().commit(); > } > @Override > public void close() { > this.getContext().commit(); > } > } > {code} > > My configuration for KafkaStreams: > > {code:java} > application.id=dv_ws_in_app_activity_dev4 > bootstrap.servers=VLXH1 > auto.offset.reset=latest > num.stream.threads=1 > key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde > value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde > poll.ms = 100 > commit.interval.ms=1000 > state.dir=../../temp/kafka-state-dir > {code} > Version: *0.11.0.1* > > I'm witnessing that after closing() the streams, these ports are still > listening: > > {code:java} > $ sudo lsof -i -n -P | grep 9092 > java 29457 ozgur 133u IPv6 0x531e550533f38283 0t0 TCP > x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED) > java 29457 ozgur 134u IPv6 0x531e55051a789ec3 0t0 TCP > x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED) > java 29457 ozgur 135u IPv6 0x531e55051a789903 0t0 TCP > x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 136u IPv6 0x531e55051a78aa43 0t0 TCP > x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 140u IPv6 0x531e55051a78c703 0t0 TCP > x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 141u IPv6 0x531e55051a78a483 0t0 TCP > x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED) > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7628) KafkaStream is not closing
[ https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690640#comment-16690640 ] Ozgur commented on KAFKA-7628: -- I'm using the [version 0.11.0. |https://github.com/apache/kafka/blob/0.11.0/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java] > KafkaStream is not closing > -- > > Key: KAFKA-7628 > URL: https://issues.apache.org/jira/browse/KAFKA-7628 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 > Environment: Macbook Pro >Reporter: Ozgur >Priority: Major > > I'm closing a KafkaStream when I need based on a certain condition: > Closing: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream already closed?"); > } else { > boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS); > if(closed) { > kafkaStream = null; > logger.info("KafkaStream closed"); > } else { > logger.info("KafkaStream could not closed"); > } > } > {code} > Starting: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream is starting"); > kafkaStream = > KafkaManager.getInstance().getStream(this.getConfigFilePath(), > this, > this.getTopic() > ); > kafkaStream.start(); > logger.info("KafkaStream is started"); > } > {code} > > > In my implementation of Processor, {{process(String key, byte[] value)}} is > still called although successfully closing stream: > > {code:java} > // code placeholder > public abstract class BaseKafkaProcessor implements Processor > { > private static Logger logger = > LogManager.getLogger(BaseKafkaProcessor.class); > private ProcessorContext context; > private ProcessorContext getContext() { > return context; > } > @Override > public void init(ProcessorContext context) { > this.context = context; > this.context.schedule(1000); > } > @Override > public void process(String key, byte[] value) { > try { > String topic = key.split("-")[0]; > byte[] uncompressed = GzipCompressionUtil.uncompress(value); > String json = new String(uncompressed, "UTF-8"); > processRecord(topic, json); > this.getContext().commit(); > } catch (Exception e) { > logger.error("Error processing json", e); > } > } > protected abstract void processRecord(String topic, String json); > @Override > public void punctuate(long timestamp) { > this.getContext().commit(); > } > @Override > public void close() { > this.getContext().commit(); > } > } > {code} > > My configuration for KafkaStreams: > > {code:java} > application.id=dv_ws_in_app_activity_dev4 > bootstrap.servers=VLXH1 > auto.offset.reset=latest > num.stream.threads=1 > key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde > value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde > poll.ms = 100 > commit.interval.ms=1000 > state.dir=../../temp/kafka-state-dir > {code} > Version: *0.11.0.1* > > I'm witnessing that after closing() the streams, these ports are still > listening: > > {code:java} > $ sudo lsof -i -n -P | grep 9092 > java 29457 ozgur 133u IPv6 0x531e550533f38283 0t0 TCP > x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED) > java 29457 ozgur 134u IPv6 0x531e55051a789ec3 0t0 TCP > x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED) > java 29457 ozgur 135u IPv6 0x531e55051a789903 0t0 TCP > x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 136u IPv6 0x531e55051a78aa43 0t0 TCP > x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 140u IPv6 0x531e55051a78c703 0t0 TCP > x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 141u IPv6 0x531e55051a78a483 0t0 TCP > x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED) > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7628) KafkaStream is not closing
[ https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690138#comment-16690138 ] Guozhang Wang commented on KAFKA-7628: -- Hmm.. in this case the state should not be transit to `NOT_RUNNING` as the stream threads are not fully joined. In the latest version we've slightly improved the logic with a shutdown thread whose logic is basically: {code} for (final StreamThread thread : threads) { try { if (!thread.isRunning()) { thread.join(); } } catch (final InterruptedException ex) { Thread.currentThread().interrupt(); } } if (globalStreamThread != null) { globalStreamThread.setStateListener(null); globalStreamThread.shutdown(); } if (globalStreamThread != null && !globalStreamThread.stillRunning()) { try { globalStreamThread.join(); } catch (final InterruptedException e) { Thread.currentThread().interrupt(); } globalStreamThread = null; } adminClient.close(); metrics.close(); setState(State.NOT_RUNNING); {code} If the stream-threads have not all joined, it should not proceed to `setState(State.NOT_RUNNING)`. > KafkaStream is not closing > -- > > Key: KAFKA-7628 > URL: https://issues.apache.org/jira/browse/KAFKA-7628 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 > Environment: Macbook Pro >Reporter: Ozgur >Priority: Major > > I'm closing a KafkaStream when I need based on a certain condition: > Closing: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream already closed?"); > } else { > boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS); > if(closed) { > kafkaStream = null; > logger.info("KafkaStream closed"); > } else { > logger.info("KafkaStream could not closed"); > } > } > {code} > Starting: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream is starting"); > kafkaStream = > KafkaManager.getInstance().getStream(this.getConfigFilePath(), > this, > this.getTopic() > ); > kafkaStream.start(); > logger.info("KafkaStream is started"); > } > {code} > > > In my implementation of Processor, {{process(String key, byte[] value)}} is > still called although successfully closing stream: > > {code:java} > // code placeholder > public abstract class BaseKafkaProcessor implements Processor > { > private static Logger logger = > LogManager.getLogger(BaseKafkaProcessor.class); > private ProcessorContext context; > private ProcessorContext getContext() { > return context; > } > @Override > public void init(ProcessorContext context) { > this.context = context; > this.context.schedule(1000); > } > @Override > public void process(String key, byte[] value) { > try { > String topic = key.split("-")[0]; > byte[] uncompressed = GzipCompressionUtil.uncompress(value); > String json = new String(uncompressed, "UTF-8"); > processRecord(topic, json); > this.getContext().commit(); > } catch (Exception e) { > logger.error("Error processing json", e); > } > } > protected abstract void processRecord(String topic, String json); > @Override > public void punctuate(long timestamp) { > this.getContext().commit(); > } > @Override > public void close() { > this.getContext().commit(); > } > } > {code} > > My configuration for KafkaStreams: > > {code:java} > application.id=dv_ws_in_app_activity_dev4 > bootstrap.servers=VLXH1 > auto.offset.reset=latest > num.stream.threads=1 > key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde > value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde > poll.ms = 100 > commit.interval.ms=1000 > state.dir=../../temp/kafka-state-dir > {code} > Version: *0.11.0.1* > > I'm witnessing that after closing() the streams, these ports are still > listening: > > {code:java} > $ sudo lsof -i -n -P | grep 9092 > java 29457 ozgur 133u IPv6 0x531e550533f38
[jira] [Commented] (KAFKA-7628) KafkaStream is not closing
[ https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689085#comment-16689085 ] Ozgur commented on KAFKA-7628: -- Hi Guozhang, I've upgraded my client version to the last version (2.0.1) but the problem was same. I'm thinking about this is more likely an application logic error other than Kafka's. Thanks. > KafkaStream is not closing > -- > > Key: KAFKA-7628 > URL: https://issues.apache.org/jira/browse/KAFKA-7628 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 > Environment: Macbook Pro >Reporter: Ozgur >Priority: Major > > I'm closing a KafkaStream when I need based on a certain condition: > Closing: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream already closed?"); > } else { > boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS); > if(closed) { > kafkaStream = null; > logger.info("KafkaStream closed"); > } else { > logger.info("KafkaStream could not closed"); > } > } > {code} > Starting: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream is starting"); > kafkaStream = > KafkaManager.getInstance().getStream(this.getConfigFilePath(), > this, > this.getTopic() > ); > kafkaStream.start(); > logger.info("KafkaStream is started"); > } > {code} > > > In my implementation of Processor, {{process(String key, byte[] value)}} is > still called although successfully closing stream: > > {code:java} > // code placeholder > public abstract class BaseKafkaProcessor implements Processor > { > private static Logger logger = > LogManager.getLogger(BaseKafkaProcessor.class); > private ProcessorContext context; > private ProcessorContext getContext() { > return context; > } > @Override > public void init(ProcessorContext context) { > this.context = context; > this.context.schedule(1000); > } > @Override > public void process(String key, byte[] value) { > try { > String topic = key.split("-")[0]; > byte[] uncompressed = GzipCompressionUtil.uncompress(value); > String json = new String(uncompressed, "UTF-8"); > processRecord(topic, json); > this.getContext().commit(); > } catch (Exception e) { > logger.error("Error processing json", e); > } > } > protected abstract void processRecord(String topic, String json); > @Override > public void punctuate(long timestamp) { > this.getContext().commit(); > } > @Override > public void close() { > this.getContext().commit(); > } > } > {code} > > My configuration for KafkaStreams: > > {code:java} > application.id=dv_ws_in_app_activity_dev4 > bootstrap.servers=VLXH1 > auto.offset.reset=latest > num.stream.threads=1 > key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde > value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde > poll.ms = 100 > commit.interval.ms=1000 > state.dir=../../temp/kafka-state-dir > {code} > Version: *0.11.0.1* > > I'm witnessing that after closing() the streams, these ports are still > listening: > > {code:java} > $ sudo lsof -i -n -P | grep 9092 > java 29457 ozgur 133u IPv6 0x531e550533f38283 0t0 TCP > x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED) > java 29457 ozgur 134u IPv6 0x531e55051a789ec3 0t0 TCP > x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED) > java 29457 ozgur 135u IPv6 0x531e55051a789903 0t0 TCP > x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 136u IPv6 0x531e55051a78aa43 0t0 TCP > x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 140u IPv6 0x531e55051a78c703 0t0 TCP > x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 141u IPv6 0x531e55051a78a483 0t0 TCP > x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED) > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7628) KafkaStream is not closing
[ https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16689078#comment-16689078 ] Guozhang Wang commented on KAFKA-7628: -- In old versions of Kafka Streams there may be some bugs about state transition that gets fixed. More specifically: `KafkaStreams.close(timeout)` triggers its stream thread to complete and join and then wait for the specified timeout. It returns `false` indicating that not all threads are joined within that timeout. But after that the shutting down may still be completed later (i.e. the state eventually transit to NOT_RUNNING). Could you try to upgrade to newer version and see if it issue still persists? > KafkaStream is not closing > -- > > Key: KAFKA-7628 > URL: https://issues.apache.org/jira/browse/KAFKA-7628 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 > Environment: Macbook Pro >Reporter: Ozgur >Priority: Major > > I'm closing a KafkaStream when I need based on a certain condition: > Closing: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream already closed?"); > } else { > boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS); > if(closed) { > kafkaStream = null; > logger.info("KafkaStream closed"); > } else { > logger.info("KafkaStream could not closed"); > } > } > {code} > Starting: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream is starting"); > kafkaStream = > KafkaManager.getInstance().getStream(this.getConfigFilePath(), > this, > this.getTopic() > ); > kafkaStream.start(); > logger.info("KafkaStream is started"); > } > {code} > > > In my implementation of Processor, {{process(String key, byte[] value)}} is > still called although successfully closing stream: > > {code:java} > // code placeholder > public abstract class BaseKafkaProcessor implements Processor > { > private static Logger logger = > LogManager.getLogger(BaseKafkaProcessor.class); > private ProcessorContext context; > private ProcessorContext getContext() { > return context; > } > @Override > public void init(ProcessorContext context) { > this.context = context; > this.context.schedule(1000); > } > @Override > public void process(String key, byte[] value) { > try { > String topic = key.split("-")[0]; > byte[] uncompressed = GzipCompressionUtil.uncompress(value); > String json = new String(uncompressed, "UTF-8"); > processRecord(topic, json); > this.getContext().commit(); > } catch (Exception e) { > logger.error("Error processing json", e); > } > } > protected abstract void processRecord(String topic, String json); > @Override > public void punctuate(long timestamp) { > this.getContext().commit(); > } > @Override > public void close() { > this.getContext().commit(); > } > } > {code} > > My configuration for KafkaStreams: > > {code:java} > application.id=dv_ws_in_app_activity_dev4 > bootstrap.servers=VLXH1 > auto.offset.reset=latest > num.stream.threads=1 > key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde > value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde > poll.ms = 100 > commit.interval.ms=1000 > state.dir=../../temp/kafka-state-dir > {code} > Version: *0.11.0.1* > > I'm witnessing that after closing() the streams, these ports are still > listening: > > {code:java} > $ sudo lsof -i -n -P | grep 9092 > java 29457 ozgur 133u IPv6 0x531e550533f38283 0t0 TCP > x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED) > java 29457 ozgur 134u IPv6 0x531e55051a789ec3 0t0 TCP > x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED) > java 29457 ozgur 135u IPv6 0x531e55051a789903 0t0 TCP > x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 136u IPv6 0x531e55051a78aa43 0t0 TCP > x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 140u IPv6 0x531e55051a78c703 0t0 TCP > x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 141u IPv6 0x531e55051a78a483 0t0 TCP > x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED) > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7628) KafkaStream is not closing
[ https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687486#comment-16687486 ] Ozgur commented on KAFKA-7628: -- Hi John, It returns *false* from the close() method, but state is NOT_RUNNING. And still bounds to the TCP ports after a few mins. After trying to close stream, some sockets were in the state of {{CLOSE_WAIT like this output:}} $ sudo lsof -i -n -P | grep 9092 java 29457 ozgur 133u IPv6 0x531e550533f38283 0t0 TCP x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED) java 29457 ozgur 134u IPv6 0x531e55051a789ec3 0t0 TCP x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED) java 29457 ozgur 135u IPv6 0x531e55051a789903 0t0 TCP x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED) java 29457 ozgur 136u IPv6 0x531e55051a78aa43 0t0 TCP x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED) java 29457 ozgur 140u IPv6 0x531e55051a78c703 0t0 TCP x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED) java 29457 ozgur 141u IPv6 0x531e55051a78a483 0t0 TCP x.27.227.182:54424->x.x.164.45:9092 (CLOSE_WAIT) java 29457 ozgur 141u IPv6 0x531e55051a78a483 0t0 TCP x.27.227.182:54425->x.x.164.45:9092 (CLOSE_WAIT) java 29457 ozgur 141u IPv6 0x531e55051a78a483 0t0 TCP x.27.227.182:54426->x.x.164.45:9092 (ESTABLISHED) Thanks > KafkaStream is not closing > -- > > Key: KAFKA-7628 > URL: https://issues.apache.org/jira/browse/KAFKA-7628 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 > Environment: Macbook Pro >Reporter: Ozgur >Priority: Major > > I'm closing a KafkaStream when I need based on a certain condition: > Closing: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream already closed?"); > } else { > boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS); > if(closed) { > kafkaStream = null; > logger.info("KafkaStream closed"); > } else { > logger.info("KafkaStream could not closed"); > } > } > {code} > Starting: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream is starting"); > kafkaStream = > KafkaManager.getInstance().getStream(this.getConfigFilePath(), > this, > this.getTopic() > ); > kafkaStream.start(); > logger.info("KafkaStream is started"); > } > {code} > > > In my implementation of Processor, {{process(String key, byte[] value)}} is > still called although successfully closing stream: > > {code:java} > // code placeholder > public abstract class BaseKafkaProcessor implements Processor > { > private static Logger logger = > LogManager.getLogger(BaseKafkaProcessor.class); > private ProcessorContext context; > private ProcessorContext getContext() { > return context; > } > @Override > public void init(ProcessorContext context) { > this.context = context; > this.context.schedule(1000); > } > @Override > public void process(String key, byte[] value) { > try { > String topic = key.split("-")[0]; > byte[] uncompressed = GzipCompressionUtil.uncompress(value); > String json = new String(uncompressed, "UTF-8"); > processRecord(topic, json); > this.getContext().commit(); > } catch (Exception e) { > logger.error("Error processing json", e); > } > } > protected abstract void processRecord(String topic, String json); > @Override > public void punctuate(long timestamp) { > this.getContext().commit(); > } > @Override > public void close() { > this.getContext().commit(); > } > } > {code} > > My configuration for KafkaStreams: > > {code:java} > application.id=dv_ws_in_app_activity_dev4 > bootstrap.servers=VLXH1 > auto.offset.reset=latest > num.stream.threads=1 > key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde > value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde > poll.ms = 100 > commit.interval.ms=1000 > state.dir=../../temp/kafka-state-dir > {code} > Version: *0.11.0.1* > > I'm witnessing that after closing() the streams, these ports are still > listening: > > {code:java} > $ sudo lsof -i -n -P | grep 9092 > java 29457 ozgur 133u IPv6 0x531e550533f38283 0t0 TCP > x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED) > java 29457 ozgur 134u IPv6 0x531e55051a789ec3 0t0 TCP > x.
[jira] [Commented] (KAFKA-7628) KafkaStream is not closing
[ https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16687359#comment-16687359 ] John Roesler commented on KAFKA-7628: - Hi [~lugrugzo], To confirm, you see that Streams successfully closes, but afterwards, it's still bound to the TCP ports? Have you noticed whether it stays bound indefinitely, or does it stop listening at some point after closing? Thanks, -John > KafkaStream is not closing > -- > > Key: KAFKA-7628 > URL: https://issues.apache.org/jira/browse/KAFKA-7628 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.11.0.1 > Environment: Macbook Pro >Reporter: Ozgur >Priority: Major > > I'm closing a KafkaStream when I need based on a certain condition: > Closing: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream already closed?"); > } else { > boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS); > if(closed) { > kafkaStream = null; > logger.info("KafkaStream closed"); > } else { > logger.info("KafkaStream could not closed"); > } > } > {code} > Starting: > > {code:java} > if(kafkaStream == null) { > logger.info("KafkaStream is starting"); > kafkaStream = > KafkaManager.getInstance().getStream(this.getConfigFilePath(), > this, > this.getTopic() > ); > kafkaStream.start(); > logger.info("KafkaStream is started"); > } > {code} > > > In my implementation of Processor, {{process(String key, byte[] value)}} is > still called although successfully closing stream: > > {code:java} > // code placeholder > public abstract class BaseKafkaProcessor implements Processor > { > private static Logger logger = > LogManager.getLogger(BaseKafkaProcessor.class); > private ProcessorContext context; > private ProcessorContext getContext() { > return context; > } > @Override > public void init(ProcessorContext context) { > this.context = context; > this.context.schedule(1000); > } > @Override > public void process(String key, byte[] value) { > try { > String topic = key.split("-")[0]; > byte[] uncompressed = GzipCompressionUtil.uncompress(value); > String json = new String(uncompressed, "UTF-8"); > processRecord(topic, json); > this.getContext().commit(); > } catch (Exception e) { > logger.error("Error processing json", e); > } > } > protected abstract void processRecord(String topic, String json); > @Override > public void punctuate(long timestamp) { > this.getContext().commit(); > } > @Override > public void close() { > this.getContext().commit(); > } > } > {code} > > My configuration for KafkaStreams: > > {code:java} > application.id=dv_ws_in_app_activity_dev4 > bootstrap.servers=VLXH1 > auto.offset.reset=latest > num.stream.threads=1 > key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde > value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde > poll.ms = 100 > commit.interval.ms=1000 > state.dir=../../temp/kafka-state-dir > {code} > Version: *0.11.0.1* > > I'm witnessing that after closing() the streams, these ports are still > listening: > > {code:java} > $ sudo lsof -i -n -P | grep 9092 > java 29457 ozgur 133u IPv6 0x531e550533f38283 0t0 TCP > x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED) > java 29457 ozgur 134u IPv6 0x531e55051a789ec3 0t0 TCP > x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED) > java 29457 ozgur 135u IPv6 0x531e55051a789903 0t0 TCP > x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 136u IPv6 0x531e55051a78aa43 0t0 TCP > x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 140u IPv6 0x531e55051a78c703 0t0 TCP > x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED) > java 29457 ozgur 141u IPv6 0x531e55051a78a483 0t0 TCP > x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED) > {code} > > -- This message was sent by Atlassian JIRA (v7.6.3#76005)