[jira] [Assigned] (KAFKA-7527) Enable Dependency Injection for Kafka Streams handlers (KIP-378)

2018-11-17 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7527?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-7527:


Assignee: Wladimir Schmidt

> Enable Dependency Injection for Kafka Streams handlers (KIP-378)
> 
>
> Key: KAFKA-7527
> URL: https://issues.apache.org/jira/browse/KAFKA-7527
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0, 2.1.0
>Reporter: Wladimir Schmidt
>Assignee: Wladimir Schmidt
>Priority: Minor
>  Labels: kip, usability
>
> Implement solution proposed in the KIP-378 (Enable Dependency Injection for 
> Kafka Streams handlers).
> Link to 
> [KIP-378|https://cwiki.apache.org/confluence/display/KAFKA/KIP-378:+Enable+Dependency+Injection+for+Kafka+Streams+handlers]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable

2018-11-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690719#comment-16690719
 ] 

ASF GitHub Bot commented on KAFKA-7536:
---

guozhangwang opened a new pull request #5923: KAFKA-7536: Initialize 
TopologyTestDriver with non-null topic
URL: https://github.com/apache/kafka/pull/5923
 
 
   In TopologyTestDriver constructor set non-null topic; and in unit test 
intentionally turn on caching to verify this case.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> TopologyTestDriver cannot pre-populate KTable or GlobalKTable
> -
>
> Key: KAFKA-7536
> URL: https://issues.apache.org/jira/browse/KAFKA-7536
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Dmitry Minkovsky
>Priority: Minor
>
> I have a GlobalKTable that's defined as
> {code}
> GlobalKTable userIdsByEmail = topology  
>.globalTable(USER_IDS_BY_EMAIL.name,
>USER_IDS_BY_EMAIL.consumed(),
>Materialized.as("user-ids-by-email"));
> {code}
> And the following test in Spock:
> {code}
> def topology = // my topology
> def driver = new TopologyTestDriver(topology, config())
> def cleanup() {
> driver.close()
> }
> def "create from email request"() {
> def store = driver.getKeyValueStore('user-ids-by-email')
> store.put('string', ByteString.copyFrom(new byte[0]))
> // more, but it fails at the `put` above
> {code}
> When I run this, I get the following:
> {code}
> [2018-10-23 19:35:27,055] INFO 
> (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock 
> Restoring state for global store user-ids-by-email
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
>   at pony.message.MessageWriteStreamsTest.create from mailgun email 
> request(MessageWriteStreamsTest.groovy:52)
> [2018-10-23 19:35:27,189] INFO 
> (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread 
> [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
> {code}
> The same issue applies to KTable.
> I've noticed that I can {{put()}} to the store if I first write to it with 
> {{driver.pipeInput}}. But otherwise I get the above error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7536) TopologyTestDriver cannot pre-populate KTable or GlobalKTable

2018-11-17 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7536?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690720#comment-16690720
 ] 

Guozhang Wang commented on KAFKA-7536:
--

[~dminkovsky] I've submitted a PR which I think should resolve your issue, 
please try it out: https://github.com/apache/kafka/pull/5923

> TopologyTestDriver cannot pre-populate KTable or GlobalKTable
> -
>
> Key: KAFKA-7536
> URL: https://issues.apache.org/jira/browse/KAFKA-7536
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: Dmitry Minkovsky
>Priority: Minor
>
> I have a GlobalKTable that's defined as
> {code}
> GlobalKTable userIdsByEmail = topology  
>.globalTable(USER_IDS_BY_EMAIL.name,
>USER_IDS_BY_EMAIL.consumed(),
>Materialized.as("user-ids-by-email"));
> {code}
> And the following test in Spock:
> {code}
> def topology = // my topology
> def driver = new TopologyTestDriver(topology, config())
> def cleanup() {
> driver.close()
> }
> def "create from email request"() {
> def store = driver.getKeyValueStore('user-ids-by-email')
> store.put('string', ByteString.copyFrom(new byte[0]))
> // more, but it fails at the `put` above
> {code}
> When I run this, I get the following:
> {code}
> [2018-10-23 19:35:27,055] INFO 
> (org.apache.kafka.streams.processor.internals.GlobalStateManagerImpl) mock 
> Restoring state for global store user-ids-by-email
> java.lang.NullPointerException
>   at 
> org.apache.kafka.streams.processor.internals.AbstractProcessorContext.topic(AbstractProcessorContext.java:115)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putInternal(CachingKeyValueStore.java:237)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:220)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.put(CachingKeyValueStore.java:38)
>   at 
> org.apache.kafka.streams.state.internals.InnerMeteredKeyValueStore.put(InnerMeteredKeyValueStore.java:206)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueBytesStore.put(MeteredKeyValueBytesStore.java:117)
>   at pony.message.MessageWriteStreamsTest.create from mailgun email 
> request(MessageWriteStreamsTest.groovy:52)
> [2018-10-23 19:35:27,189] INFO 
> (org.apache.kafka.streams.processor.internals.StateDirectory) stream-thread 
> [main] Deleting state directory 0_0 for task 0_0 as user calling cleanup.
> {code}
> The same issue applies to KTable.
> I've noticed that I can {{put()}} to the store if I first write to it with 
> {{driver.pipeInput}}. But otherwise I get the above error.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7628) KafkaStream is not closing

2018-11-17 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690710#comment-16690710
 ] 

Guozhang Wang commented on KAFKA-7628:
--

I see :) You mentioned that after upgrading to version (2.0.1) you still 
observe the same, and I thought you meant that the state can still transit to 
`NOT_RUNNING` while threads are not joined yet.

Thanks for confirming!

> KafkaStream is not closing
> --
>
> Key: KAFKA-7628
> URL: https://issues.apache.org/jira/browse/KAFKA-7628
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
> Environment: Macbook Pro
>Reporter: Ozgur
>Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream already closed?");
> } else {
> boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
> if(closed) {
> kafkaStream = null;
> logger.info("KafkaStream closed");
> } else {
> logger.info("KafkaStream could not closed");
> }
> }
> {code}
> Starting:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream is starting");
> kafkaStream = 
> KafkaManager.getInstance().getStream(this.getConfigFilePath(),
> this,
> this.getTopic()
> );
> kafkaStream.start();
> logger.info("KafkaStream is started");
> }
> {code}
>  
>  
> In my implementation of Processor, {{process(String key, byte[] value)}} is 
> still called although successfully closing stream:
>  
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor 
> {
> private static Logger logger = 
> LogManager.getLogger(BaseKafkaProcessor.class);
> private ProcessorContext context;
> private ProcessorContext getContext() {
> return context;
> }
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(1000);
> }
> @Override
> public void process(String key, byte[] value) {
> try {
> String topic = key.split("-")[0];
> byte[] uncompressed = GzipCompressionUtil.uncompress(value);
> String json = new String(uncompressed, "UTF-8");
> processRecord(topic, json);
> this.getContext().commit();
> } catch (Exception e) {
> logger.error("Error processing json", e);
> }
> }
> protected abstract void processRecord(String topic, String json);
> @Override
> public void punctuate(long timestamp) {
> this.getContext().commit();
> }
> @Override
> public void close() {
> this.getContext().commit();
> }
> }
> {code}
>  
> My configuration for KafkaStreams:
>  
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1* 
>  
> I'm witnessing that after closing() the streams, these ports are still 
> listening:
>  
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP 
> x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP 
> x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
> java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP 
> x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP 
> x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP 
> x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
> x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5054) ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized

2018-11-17 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5054?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690707#comment-16690707
 ] 

ASF GitHub Bot commented on KAFKA-5054:
---

guozhangwang closed pull request #5873: KAFKA-5054 : synchronized all the 
methods
URL: https://github.com/apache/kafka/pull/5873
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
index 1d8fb5806be..6f853baa6ed 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/state/internals/ChangeLoggingKeyValueBytesStore.java
@@ -61,13 +61,13 @@ public long approximateNumEntries() {
 }
 
 @Override
-public void put(final Bytes key, final byte[] value) {
+public synchronized void put(final Bytes key, final byte[] value) {
 inner.put(key, value);
 changeLogger.logChange(key, value);
 }
 
 @Override
-public byte[] putIfAbsent(final Bytes key, final byte[] value) {
+public synchronized byte[] putIfAbsent(final Bytes key, final byte[] 
value) {
 final byte[] previous = get(key);
 if (previous == null) {
 put(key, value);
@@ -76,7 +76,7 @@ public void put(final Bytes key, final byte[] value) {
 }
 
 @Override
-public void putAll(final List> entries) {
+public synchronized void putAll(final List> 
entries) {
 inner.putAll(entries);
 for (final KeyValue entry : entries) {
 changeLogger.logChange(entry.key, entry.value);
@@ -84,24 +84,24 @@ public void putAll(final List> 
entries) {
 }
 
 @Override
-public byte[] delete(final Bytes key) {
+public synchronized byte[] delete(final Bytes key) {
 final byte[] oldValue = inner.delete(key);
 changeLogger.logChange(key, null);
 return oldValue;
 }
 
 @Override
-public byte[] get(final Bytes key) {
+public synchronized byte[] get(final Bytes key) {
 return inner.get(key);
 }
 
 @Override
-public KeyValueIterator range(final Bytes from, final Bytes 
to) {
+public synchronized KeyValueIterator range(final Bytes 
from, final Bytes to) {
 return inner.range(from, to);
 }
 
 @Override
-public KeyValueIterator all() {
+public synchronized KeyValueIterator all() {
 return inner.all();
 }
 }


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> ChangeLoggingKeyValueByteStore delete and putIfAbsent should be synchronized
> 
>
> Key: KAFKA-5054
> URL: https://issues.apache.org/jira/browse/KAFKA-5054
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Assignee: Liju
>Priority: Critical
>
> {{putIfAbsent}} and {{delete}} should be synchronized as they involve at 
> least 2 operations on the underlying store and may result in inconsistent 
> results if someone were to query via IQ



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7062) Simplify MirrorMaker loop after removal of old consumer support

2018-11-17 Thread Junyu Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7062?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690697#comment-16690697
 ] 

Junyu Chen commented on KAFKA-7062:
---

Hi [~ijuma], looks like this ticket hasn't been updated for months. Can I give 
it a shot? 

> Simplify MirrorMaker loop after removal of old consumer support
> ---
>
> Key: KAFKA-7062
> URL: https://issues.apache.org/jira/browse/KAFKA-7062
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Andras Beni
>Priority: Minor
>  Labels: newbie
>
> Once KAFKA-2983 is merged, we can simplify the MirrorMaker loop to be a 
> single loop instead of two nested loops. In the old consumer, even if there 
> is no message offsets would still be committed so receive() could block. The 
> new consumer doesn't have this issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0

2018-11-17 Thread Jonathan Gordon (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7652?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jonathan Gordon updated KAFKA-7652:
---
Description: 
I'm creating this issue in response to [~guozhang]'s request on the mailing 
list:

[https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]

We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
experience a severe performance degradation. The highest amount of CPU time 
seems spent in retrieving from the local cache. Here's an example thread 
profile with 0.11.0.0:

[https://i.imgur.com/l5VEsC2.png]

When things are running smoothly we're gated by retrieving from the state store 
with acceptable performance. Here's an example thread profile with 0.10.2.1:

[https://i.imgur.com/IHxC2cZ.png]

Some investigation reveals that it appears we're performing about 3 orders 
magnitude more lookups on the NamedCache over a comparable time period. I've 
attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.

We're using session windows and have the app configured for commit.interval.ms 
= 30 * 1000 and cache.max.bytes.buffering = 10485760

I'm happy to share more details if they would be helpful. Also happy to run 
tests on our data.

I also found this issue, which seems like it may be related:

https://issues.apache.org/jira/browse/KAFKA-4904

 

  was:
Here's the original thread from the mailing list:

https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E

We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
experience a severe performance degradation. The highest amount of CPU time 
seems spent in retrieving from the local cache. Here's an example with 0.11.0.0:

[https://i.imgur.com/l5VEsC2.png]

When things are running smoothly we're gated by retrieving from the state store 
with acceptable performance. Here's an example with 0.10.2.1:

[https://i.imgur.com/IHxC2cZ.png]

Some investigation reveals that it appears we're performing about 3 orders 
magnitude more lookups on the NamedCache over a comparable time period. I've 
attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.

We're using session windows and have the app configured for commit.interval.ms 
of 30 * 1000 and cache.max.bytes.buffering = 10485760

I'm happy to share more details if they would be helpful. Also happy to run 
tests on our data.

 

 


> Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0
> -
>
> Key: KAFKA-7652
> URL: https://issues.apache.org/jira/browse/KAFKA-7652
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.11.0.0, 0.11.0.1, 0.11.0.2, 0.11.0.3, 1.1.1, 2.0.0, 
> 2.0.1
>Reporter: Jonathan Gordon
>Priority: Major
> Attachments: kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt
>
>
> I'm creating this issue in response to [~guozhang]'s request on the mailing 
> list:
> [https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E]
> We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
> experience a severe performance degradation. The highest amount of CPU time 
> seems spent in retrieving from the local cache. Here's an example thread 
> profile with 0.11.0.0:
> [https://i.imgur.com/l5VEsC2.png]
> When things are running smoothly we're gated by retrieving from the state 
> store with acceptable performance. Here's an example thread profile with 
> 0.10.2.1:
> [https://i.imgur.com/IHxC2cZ.png]
> Some investigation reveals that it appears we're performing about 3 orders 
> magnitude more lookups on the NamedCache over a comparable time period. I've 
> attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.
> We're using session windows and have the app configured for 
> commit.interval.ms = 30 * 1000 and cache.max.bytes.buffering = 10485760
> I'm happy to share more details if they would be helpful. Also happy to run 
> tests on our data.
> I also found this issue, which seems like it may be related:
> https://issues.apache.org/jira/browse/KAFKA-4904
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7652) Kafka Streams Session store performance degradation from 0.10.2.2 to 0.11.0.0

2018-11-17 Thread Jonathan Gordon (JIRA)
Jonathan Gordon created KAFKA-7652:
--

 Summary: Kafka Streams Session store performance degradation from 
0.10.2.2 to 0.11.0.0
 Key: KAFKA-7652
 URL: https://issues.apache.org/jira/browse/KAFKA-7652
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.0.1, 2.0.0, 1.1.1, 0.11.0.3, 0.11.0.2, 0.11.0.1, 
0.11.0.0
Reporter: Jonathan Gordon
 Attachments: kafka_10_2_1_flushes.txt, kafka_11_0_3_flushes.txt

Here's the original thread from the mailing list:

https://lists.apache.org/thread.html/97d620f4fd76be070ca4e2c70e2fda53cafe051e8fc4505dbcca0321@%3Cusers.kafka.apache.org%3E

We are attempting to upgrade our Kafka Streams application from 0.10.2.1 but 
experience a severe performance degradation. The highest amount of CPU time 
seems spent in retrieving from the local cache. Here's an example with 0.11.0.0:

[https://i.imgur.com/l5VEsC2.png]

When things are running smoothly we're gated by retrieving from the state store 
with acceptable performance. Here's an example with 0.10.2.1:

[https://i.imgur.com/IHxC2cZ.png]

Some investigation reveals that it appears we're performing about 3 orders 
magnitude more lookups on the NamedCache over a comparable time period. I've 
attached logs of the NamedCache flush logs for 0.10.2.1 and 0.11.0.3.

We're using session windows and have the app configured for commit.interval.ms 
of 30 * 1000 and cache.max.bytes.buffering = 10485760

I'm happy to share more details if they would be helpful. Also happy to run 
tests on our data.

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7641) Add `consumer.group.max.size` to cap consumer metadata size on broker

2018-11-17 Thread Stanislav Kozlovski (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7641?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690650#comment-16690650
 ] 

Stanislav Kozlovski commented on KAFKA-7641:


Yep, gladly!

> Add `consumer.group.max.size` to cap consumer metadata size on broker
> -
>
> Key: KAFKA-7641
> URL: https://issues.apache.org/jira/browse/KAFKA-7641
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> In the JIRA discussion https://issues.apache.org/jira/browse/KAFKA-7610, 
> Jason concluded an edge case of current consumer protocol which could cause 
> memory burst on broker side:
> ```the case we observed in practice was caused by a consumer that was slow to 
> rejoin the group after a rebalance had begun. At the same time, there were 
> new members that were trying to join the group for the first time. The 
> request timeout was significantly lower than the rebalance timeout, so the 
> JoinGroup of the new members kept timing out. The timeout caused a retry and 
> the group size eventually become quite large because we could not detect the 
> fact that the new members were no longer there.```
> Since many disorganized join group requests are spamming the group metadata, 
> we should define a cap on broker side to avoid one consumer group from 
> growing too large. So far I feel it's appropriate to introduce this as a 
> server config since most times this value is only dealing with error 
> scenarios, client users shouldn't worry about this config.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7628) KafkaStream is not closing

2018-11-17 Thread Ozgur (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16690640#comment-16690640
 ] 

Ozgur commented on KAFKA-7628:
--

I'm using the [version 0.11.0. 
|https://github.com/apache/kafka/blob/0.11.0/streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java]

> KafkaStream is not closing
> --
>
> Key: KAFKA-7628
> URL: https://issues.apache.org/jira/browse/KAFKA-7628
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.1
> Environment: Macbook Pro
>Reporter: Ozgur
>Priority: Major
>
> I'm closing a KafkaStream when I need based on a certain condition:
> Closing:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream already closed?");
> } else {
> boolean closed = kafkaStream.close(10L, TimeUnit.SECONDS);
> if(closed) {
> kafkaStream = null;
> logger.info("KafkaStream closed");
> } else {
> logger.info("KafkaStream could not closed");
> }
> }
> {code}
> Starting:
>  
> {code:java}
> if(kafkaStream == null) {
> logger.info("KafkaStream is starting");
> kafkaStream = 
> KafkaManager.getInstance().getStream(this.getConfigFilePath(),
> this,
> this.getTopic()
> );
> kafkaStream.start();
> logger.info("KafkaStream is started");
> }
> {code}
>  
>  
> In my implementation of Processor, {{process(String key, byte[] value)}} is 
> still called although successfully closing stream:
>  
> {code:java}
> // code placeholder
> public abstract class BaseKafkaProcessor implements Processor 
> {
> private static Logger logger = 
> LogManager.getLogger(BaseKafkaProcessor.class);
> private ProcessorContext context;
> private ProcessorContext getContext() {
> return context;
> }
> @Override
> public void init(ProcessorContext context) {
> this.context = context;
> this.context.schedule(1000);
> }
> @Override
> public void process(String key, byte[] value) {
> try {
> String topic = key.split("-")[0];
> byte[] uncompressed = GzipCompressionUtil.uncompress(value);
> String json = new String(uncompressed, "UTF-8");
> processRecord(topic, json);
> this.getContext().commit();
> } catch (Exception e) {
> logger.error("Error processing json", e);
> }
> }
> protected abstract void processRecord(String topic, String json);
> @Override
> public void punctuate(long timestamp) {
> this.getContext().commit();
> }
> @Override
> public void close() {
> this.getContext().commit();
> }
> }
> {code}
>  
> My configuration for KafkaStreams:
>  
> {code:java}
> application.id=dv_ws_in_app_activity_dev4
> bootstrap.servers=VLXH1
> auto.offset.reset=latest
> num.stream.threads=1
> key.serde=org.apache.kafka.common.serialization.Serdes$StringSerde
> value.serde=org.apache.kafka.common.serialization.Serdes$ByteArraySerde
> poll.ms = 100
> commit.interval.ms=1000
> state.dir=../../temp/kafka-state-dir
> {code}
> Version: *0.11.0.1* 
>  
> I'm witnessing that after closing() the streams, these ports are still 
> listening:
>  
> {code:java}
> $ sudo lsof -i -n -P | grep 9092
> java      29457          ozgur  133u  IPv6 0x531e550533f38283      0t0    TCP 
> x.27.227.182:54419->x.x.164.33:9092 (ESTABLISHED)
> java      29457          ozgur  134u  IPv6 0x531e55051a789ec3      0t0    TCP 
> x.27.227.182:54420->x.x.164.45:9092 (ESTABLISHED)
> java      29457          ozgur  135u  IPv6 0x531e55051a789903      0t0    TCP 
> x.27.227.182:54421->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  136u  IPv6 0x531e55051a78aa43      0t0    TCP 
> x.27.227.182:54422->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  140u  IPv6 0x531e55051a78c703      0t0    TCP 
> x.27.227.182:54423->x.x.164.25:9092 (ESTABLISHED)
> java      29457          ozgur  141u  IPv6 0x531e55051a78a483      0t0    TCP 
> x.27.227.182:54424->x.x.164.45:9092 (ESTABLISHED)
> {code}
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)