[jira] [Commented] (KAFKA-8348) Document of kafkaStreams improvement
[ https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836924#comment-16836924 ] Lifei Chen commented on KAFKA-8348: --- Hi [~mjsax], I am starting to read and contribute to kafka streams, this is a small problem I found in the doc when learning the code. Please review the code when you have time, Thanks a lot :D. > Document of kafkaStreams improvement > > > Key: KAFKA-8348 > URL: https://issues.apache.org/jira/browse/KAFKA-8348 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Reporter: Lifei Chen >Assignee: Lifei Chen >Priority: Minor > > there is an out of date and error example in kafkaStreams.java for current > version. > * Map is not supported for initial StreamsConfig properties > * `int` does not support `toString` > related code: > {code:java} > // kafkaStreams.java > * > * A simple example might look like this: > * {@code > * Properties props = new Properties(); > * props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "my-stream-processing-application"); > * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * > * StreamsBuilder builder = new StreamsBuilder(); > * builder.stream("my-input-topic").mapValues(value -> > String.valueOf(value.length())).to("my-output-topic"); > * > * KafkaStreams streams = new KafkaStreams(builder.build(), props); > * streams.start(); > * }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8348) Document of kafkaStreams improvement
[ https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Lifei Chen updated KAFKA-8348: -- Reviewer: Matthias J. Sax > Document of kafkaStreams improvement > > > Key: KAFKA-8348 > URL: https://issues.apache.org/jira/browse/KAFKA-8348 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Reporter: Lifei Chen >Assignee: Lifei Chen >Priority: Minor > > there is an out of date and error example in kafkaStreams.java for current > version. > * Map is not supported for initial StreamsConfig properties > * `int` does not support `toString` > related code: > {code:java} > // kafkaStreams.java > * > * A simple example might look like this: > * {@code > * Properties props = new Properties(); > * props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "my-stream-processing-application"); > * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * > * StreamsBuilder builder = new StreamsBuilder(); > * builder.stream("my-input-topic").mapValues(value -> > String.valueOf(value.length())).to("my-output-topic"); > * > * KafkaStreams streams = new KafkaStreams(builder.build(), props); > * streams.start(); > * }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8348) Document of kafkaStreams improvement
[ https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836916#comment-16836916 ] ASF GitHub Bot commented on KAFKA-8348: --- hustclf commented on pull request #6707: KAFKA-8348 document optimized URL: https://github.com/apache/kafka/pull/6707 Fix out of date and wrong example in document. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Document of kafkaStreams improvement > > > Key: KAFKA-8348 > URL: https://issues.apache.org/jira/browse/KAFKA-8348 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Reporter: Lifei Chen >Assignee: Lifei Chen >Priority: Minor > > there is an out of date and error example in kafkaStreams.java for current > version. > * Map is not supported for initial StreamsConfig properties > * `int` does not support `toString` > related code: > {code:java} > // kafkaStreams.java > * > * A simple example might look like this: > * {@code > * Properties props = new Properties(); > * props.put(StreamsConfig.APPLICATION_ID_CONFIG, > "my-stream-processing-application"); > * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); > * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, > Serdes.String().getClass()); > * > * StreamsBuilder builder = new StreamsBuilder(); > * builder.stream("my-input-topic").mapValues(value -> > String.valueOf(value.length())).to("my-output-topic"); > * > * KafkaStreams streams = new KafkaStreams(builder.build(), props); > * streams.start(); > * }{code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8348) Document of kafkaStreams improvement
Lifei Chen created KAFKA-8348: - Summary: Document of kafkaStreams improvement Key: KAFKA-8348 URL: https://issues.apache.org/jira/browse/KAFKA-8348 Project: Kafka Issue Type: Improvement Components: documentation, streams Reporter: Lifei Chen Assignee: Lifei Chen there is an out of date and error example in kafkaStreams.java for current version. * Map is not supported for initial StreamsConfig properties * `int` does not support `toString` related code: {code:java} // kafkaStreams.java * * A simple example might look like this: * {@code * Properties props = new Properties(); * props.put(StreamsConfig.APPLICATION_ID_CONFIG, "my-stream-processing-application"); * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092"); * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass()); * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass()); * * StreamsBuilder builder = new StreamsBuilder(); * builder.stream("my-input-topic").mapValues(value -> String.valueOf(value.length())).to("my-output-topic"); * * KafkaStreams streams = new KafkaStreams(builder.build(), props); * streams.start(); * }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8344) Fix vagrant-up.sh to work with AWS properly
[ https://issues.apache.org/jira/browse/KAFKA-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Gwen Shapira resolved KAFKA-8344. - Resolution: Fixed Fix Version/s: 2.3.0 > Fix vagrant-up.sh to work with AWS properly > --- > > Key: KAFKA-8344 > URL: https://issues.apache.org/jira/browse/KAFKA-8344 > Project: Kafka > Issue Type: Bug >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Major > Fix For: 2.3.0 > > > I tried to run {{vagrant/vagrant-up.sh --aws}} with the following > Vagrantfile.local. > {code} > enable_dns = true > enable_hostmanager = false > # EC2 > ec2_access_key = "" > ec2_secret_key = "" > ec2_keypair_name = "keypair" > ec2_keypair_file = "/path/to/keypair/file" > ec2_region = "ap-northeast-1" > ec2_ami = "ami-0905ffddadbfd01b7" > ec2_security_groups = "sg-" > ec2_subnet_id = "subnet-" > {code} > EC2 instances were successfully created, but it failed with the following > error after that. > {code} > $ vagrant/vagrant-up.sh --aws > (snip) > An active machine was found with a different provider. Vagrant > currently allows each machine to be brought up with only a single > provider at a time. A future version will remove this limitation. > Until then, please destroy the existing machine to up with a new > provider. > Machine name: zk1 > Active provider: aws > Requested provider: virtualbox > {code} > It seems that the {{vagrant hostmanager}} command also requires > {{--provider=aws}} option, in addition to {{vagrant up}}. > With that option, it succeeded as follows: > {code} > $ git diff > diff --git a/vagrant/vagrant-up.sh b/vagrant/vagrant-up.sh > index 6a4ef9564..9210a5357 100755 > --- a/vagrant/vagrant-up.sh > +++ b/vagrant/vagrant-up.sh > @@ -220,7 +220,7 @@ function bring_up_aws { > # We still have to bring up zookeeper/broker nodes serially > echo "Bringing up zookeeper/broker machines serially" > vagrant up --provider=aws --no-parallel --no-provision > $zk_broker_machines $debug > -vagrant hostmanager > +vagrant hostmanager --provider=aws > vagrant provision > fi > @@ -231,11 +231,11 @@ function bring_up_aws { > local vagrant_rsync_temp_dir=$(mktemp -d); > TMPDIR=$vagrant_rsync_temp_dir vagrant_batch_command "vagrant up > $debug --provider=aws" "$worker_machines" "$max_parallel" > rm -rf $vagrant_rsync_temp_dir > -vagrant hostmanager > +vagrant hostmanager --provider=aws > fi > else > vagrant up --provider=aws --no-parallel --no-provision $debug > -vagrant hostmanager > +vagrant hostmanager --provider=aws > vagrant provision > fi > $ vagrant/vagrant-up.sh --aws > (snip) > ==> broker3: Running provisioner: shell... > broker3: Running: /tmp/vagrant-shell20190509-25399-8f1wgz.sh > broker3: Killing server > broker3: No kafka server to stop > broker3: Starting server > $ vagrant status > Current machine states: > zk1 running (aws) > broker1 running (aws) > broker2 running (aws) > broker3 running (aws) > This environment represents multiple VMs. The VMs are all listed > above with their current state. For more information about a specific > VM, run `vagrant status NAME`. > $ vagrant ssh broker1 > (snip) > ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh > --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --create > --partitions 1 --replication-factor 3 --topic sandbox > (snip) > ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh > --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --list > (snip) > sandbox > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8344) Fix vagrant-up.sh to work with AWS properly
[ https://issues.apache.org/jira/browse/KAFKA-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836904#comment-16836904 ] ASF GitHub Bot commented on KAFKA-8344: --- gwenshap commented on pull request #6703: KAFKA-8344. Fix vagrant-up.sh to work with AWS properly URL: https://github.com/apache/kafka/pull/6703 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix vagrant-up.sh to work with AWS properly > --- > > Key: KAFKA-8344 > URL: https://issues.apache.org/jira/browse/KAFKA-8344 > Project: Kafka > Issue Type: Bug >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Major > Fix For: 2.3.0 > > > I tried to run {{vagrant/vagrant-up.sh --aws}} with the following > Vagrantfile.local. > {code} > enable_dns = true > enable_hostmanager = false > # EC2 > ec2_access_key = "" > ec2_secret_key = "" > ec2_keypair_name = "keypair" > ec2_keypair_file = "/path/to/keypair/file" > ec2_region = "ap-northeast-1" > ec2_ami = "ami-0905ffddadbfd01b7" > ec2_security_groups = "sg-" > ec2_subnet_id = "subnet-" > {code} > EC2 instances were successfully created, but it failed with the following > error after that. > {code} > $ vagrant/vagrant-up.sh --aws > (snip) > An active machine was found with a different provider. Vagrant > currently allows each machine to be brought up with only a single > provider at a time. A future version will remove this limitation. > Until then, please destroy the existing machine to up with a new > provider. > Machine name: zk1 > Active provider: aws > Requested provider: virtualbox > {code} > It seems that the {{vagrant hostmanager}} command also requires > {{--provider=aws}} option, in addition to {{vagrant up}}. > With that option, it succeeded as follows: > {code} > $ git diff > diff --git a/vagrant/vagrant-up.sh b/vagrant/vagrant-up.sh > index 6a4ef9564..9210a5357 100755 > --- a/vagrant/vagrant-up.sh > +++ b/vagrant/vagrant-up.sh > @@ -220,7 +220,7 @@ function bring_up_aws { > # We still have to bring up zookeeper/broker nodes serially > echo "Bringing up zookeeper/broker machines serially" > vagrant up --provider=aws --no-parallel --no-provision > $zk_broker_machines $debug > -vagrant hostmanager > +vagrant hostmanager --provider=aws > vagrant provision > fi > @@ -231,11 +231,11 @@ function bring_up_aws { > local vagrant_rsync_temp_dir=$(mktemp -d); > TMPDIR=$vagrant_rsync_temp_dir vagrant_batch_command "vagrant up > $debug --provider=aws" "$worker_machines" "$max_parallel" > rm -rf $vagrant_rsync_temp_dir > -vagrant hostmanager > +vagrant hostmanager --provider=aws > fi > else > vagrant up --provider=aws --no-parallel --no-provision $debug > -vagrant hostmanager > +vagrant hostmanager --provider=aws > vagrant provision > fi > $ vagrant/vagrant-up.sh --aws > (snip) > ==> broker3: Running provisioner: shell... > broker3: Running: /tmp/vagrant-shell20190509-25399-8f1wgz.sh > broker3: Killing server > broker3: No kafka server to stop > broker3: Starting server > $ vagrant status > Current machine states: > zk1 running (aws) > broker1 running (aws) > broker2 running (aws) > broker3 running (aws) > This environment represents multiple VMs. The VMs are all listed > above with their current state. For more information about a specific > VM, run `vagrant status NAME`. > $ vagrant ssh broker1 > (snip) > ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh > --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --create > --partitions 1 --replication-factor 3 --topic sandbox > (snip) > ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh > --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --list > (snip) > sandbox > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8311) Better consumer timeout exception handling
[ https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836879#comment-16836879 ] WooYoung commented on KAFKA-8311: - [~mjsax], [~bchen225242] Thanks~ I have checked the draft implementation code. As you mentioned, I`m considering what more should be needed to add on top of 'default.api.timeout.ms' > Better consumer timeout exception handling > --- > > Key: KAFKA-8311 > URL: https://issues.apache.org/jira/browse/KAFKA-8311 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: WooYoung >Priority: Major > Labels: newbie > > When stream application crashed due to underlying consumer commit timeout, we > have seen following gaps: > 1. The current timeout exception doesn't provide meaningful tuning > instructions. We should augment the error message to let user change > `default.api.timeout.ms` in order to tolerate longer reaction time. > 2. Currently we have 3 different types of consumers on KStream: > thread-consumer, global-consumer and restore-consumer. Although we don't plan > to explicitly handle this consumer timeout on stream level, we could wrap it > with more meaningful message either on consumer or stream level to let user > be aware which consumer is having trouble. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7206) Enable batching in FindCoordinator
[ https://issues.apache.org/jira/browse/KAFKA-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836852#comment-16836852 ] Sagar Rao commented on KAFKA-7206: -- Hi [~shung], [~guozhang] Is it something that I can pick up? > Enable batching in FindCoordinator > -- > > Key: KAFKA-7206 > URL: https://issues.apache.org/jira/browse/KAFKA-7206 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Yishun Guan >Assignee: Yishun Guan >Priority: Critical > Labels: needs-discussion, needs-kip, newbie++ > > To quote [~guozhang] : > "The proposal is that, we extend FindCoordinatorRequest to have multiple > consumer ids: today each FindCoordinatorRequest only contains a single > consumer id, so in our scenario we need to send N request for N consumer > groups still. If we can request for coordinators in a single request, then > the workflow could be simplified to: > # send a single FindCoordinatorRequest to a broker asking for coordinators > of all consumer groups. > 1.a) note that the response may still succeed in finding some coordinators > while error on others, and we need to handle them on that granularity (see > below). > # and then for the collected coordinator, group them by coordinator id and > send one request per coordinator destination. > Note that this change would require the version to be bumped up, to > FIND_COORDINATOR_REQUEST_V3 for such protocol changes, also the RESPONSE > version should be bumped up in order to include multiple coordinators." > A KIP is needed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8231) Expansion of ConnectClusterState interface
[ https://issues.apache.org/jira/browse/KAFKA-8231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Randall Hauch resolved KAFKA-8231. -- Resolution: Fixed > Expansion of ConnectClusterState interface > -- > > Key: KAFKA-8231 > URL: https://issues.apache.org/jira/browse/KAFKA-8231 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 2.3.0 > > > This covers [KIP-454: Expansion of the ConnectClusterState > interface|https://cwiki.apache.org/confluence/display/KAFKA/KIP-454%3A+Expansion+of+the+ConnectClusterState+interface] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8230) Add static membership support in librd consumer client
[ https://issues.apache.org/jira/browse/KAFKA-8230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836820#comment-16836820 ] Boyang Chen commented on KAFKA-8230: [~ijuma] woo, miss this point! > Add static membership support in librd consumer client > --- > > Key: KAFKA-8230 > URL: https://issues.apache.org/jira/browse/KAFKA-8230 > Project: Kafka > Issue Type: Improvement >Reporter: Boyang Chen >Priority: Minor > Labels: consumer > > Once the effort in https://issues.apache.org/jira/browse/KAFKA-7018 is done, > one of the low hanging fruit is to add this support for other language Kafka > consumers, such as c consumer in librdKafka. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8231) Expansion of ConnectClusterState interface
[ https://issues.apache.org/jira/browse/KAFKA-8231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836819#comment-16836819 ] ASF GitHub Bot commented on KAFKA-8231: --- rhauch commented on pull request #6584: KAFKA-8231: Expansion of ConnectClusterState interface URL: https://github.com/apache/kafka/pull/6584 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Expansion of ConnectClusterState interface > -- > > Key: KAFKA-8231 > URL: https://issues.apache.org/jira/browse/KAFKA-8231 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 2.3.0 > > > This covers [KIP-454: Expansion of the ConnectClusterState > interface|https://cwiki.apache.org/confluence/display/KAFKA/KIP-454%3A+Expansion+of+the+ConnectClusterState+interface] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8311) Better consumer timeout exception handling
[ https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836813#comment-16836813 ] Boyang Chen edited comment on KAFKA-8311 at 5/10/19 1:09 AM: - [~mjsax] [~clearpal7] Thank you both! If you wonder the codepath, I have some draft implementation here: [https://github.com/apache/kafka/pull/6662] we probably could try to merge this one first, and then discuss whether we should add a new timeout config on top of `default.api.timeout.ms` was (Author: bchen225242): [~mjsax] [~clearpal7] Thank you both! If you wonder the codepath, I have some draft implementation here: [https://github.com/apache/kafka/pull/6662] > Better consumer timeout exception handling > --- > > Key: KAFKA-8311 > URL: https://issues.apache.org/jira/browse/KAFKA-8311 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: WooYoung >Priority: Major > Labels: newbie > > When stream application crashed due to underlying consumer commit timeout, we > have seen following gaps: > 1. The current timeout exception doesn't provide meaningful tuning > instructions. We should augment the error message to let user change > `default.api.timeout.ms` in order to tolerate longer reaction time. > 2. Currently we have 3 different types of consumers on KStream: > thread-consumer, global-consumer and restore-consumer. Although we don't plan > to explicitly handle this consumer timeout on stream level, we could wrap it > with more meaningful message either on consumer or stream level to let user > be aware which consumer is having trouble. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8311) Better consumer timeout exception handling
[ https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836813#comment-16836813 ] Boyang Chen commented on KAFKA-8311: [~mjsax] [~clearpal7] Thank you both! If you wonder the codepath, I have some draft implementation here: [https://github.com/apache/kafka/pull/6662] > Better consumer timeout exception handling > --- > > Key: KAFKA-8311 > URL: https://issues.apache.org/jira/browse/KAFKA-8311 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: WooYoung >Priority: Major > Labels: newbie > > When stream application crashed due to underlying consumer commit timeout, we > have seen following gaps: > 1. The current timeout exception doesn't provide meaningful tuning > instructions. We should augment the error message to let user change > `default.api.timeout.ms` in order to tolerate longer reaction time. > 2. Currently we have 3 different types of consumers on KStream: > thread-consumer, global-consumer and restore-consumer. Although we don't plan > to explicitly handle this consumer timeout on stream level, we could wrap it > with more meaningful message either on consumer or stream level to let user > be aware which consumer is having trouble. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets
[ https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836811#comment-16836811 ] Jason Gustafson commented on KAFKA-8335: Thanks for the additional information. This is looking like a bug to me. The cleaner will leave around empty batches in order to preserve producer state, but these should eventually be cleaned as long as there are newer entries for the same producerIds. That doesn't appear to be working correctly though. I will investigate further. > Log cleaner skips Transactional mark and batch record, causing unlimited > growth of __consumer_offsets > - > > Key: KAFKA-8335 > URL: https://issues.apache.org/jira/browse/KAFKA-8335 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 >Reporter: Boquan Tang >Priority: Major > Attachments: seg_april_25.zip, segment.zip > > > My Colleague Weichu already sent out a mail to kafka user mailing list > regarding this issue, but we think it's worth having a ticket tracking it. > We are using Kafka Streams with exactly-once enabled on a Kafka cluster for > a while. > Recently we found that the size of __consumer_offsets partitions grew huge. > Some partition went over 30G. This caused Kafka to take quite long to load > "__consumer_offsets" topic on startup (it loads the topic in order to > become group coordinator). > We dumped the __consumer_offsets segments and found that while normal > offset commits are nicely compacted, transaction records (COMMIT, etc) are > all preserved. Looks like that since these messages don't have a key, the > LogCleaner is keeping them all: > -- > $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files > /003484332061.log --key-decoder-class > kafka.serializer.StringDecoder 2>/dev/null | cat -v | head > Dumping 003484332061.log > Starting offset: 3484332061 > offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006 > producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 81 > offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005 > producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 84 > ... > -- > Streams is doing transaction commits per 100ms (commit.interval.ms=100 when > exactly-once) so the __consumer_offsets is growing really fast. > Is this (to keep all transactions) by design, or is that a bug for > LogCleaner? What would be the way to clean up the topic? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets
[ https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson reassigned KAFKA-8335: -- Assignee: Jason Gustafson > Log cleaner skips Transactional mark and batch record, causing unlimited > growth of __consumer_offsets > - > > Key: KAFKA-8335 > URL: https://issues.apache.org/jira/browse/KAFKA-8335 > Project: Kafka > Issue Type: Bug >Affects Versions: 2.2.0 >Reporter: Boquan Tang >Assignee: Jason Gustafson >Priority: Major > Attachments: seg_april_25.zip, segment.zip > > > My Colleague Weichu already sent out a mail to kafka user mailing list > regarding this issue, but we think it's worth having a ticket tracking it. > We are using Kafka Streams with exactly-once enabled on a Kafka cluster for > a while. > Recently we found that the size of __consumer_offsets partitions grew huge. > Some partition went over 30G. This caused Kafka to take quite long to load > "__consumer_offsets" topic on startup (it loads the topic in order to > become group coordinator). > We dumped the __consumer_offsets segments and found that while normal > offset commits are nicely compacted, transaction records (COMMIT, etc) are > all preserved. Looks like that since these messages don't have a key, the > LogCleaner is keeping them all: > -- > $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files > /003484332061.log --key-decoder-class > kafka.serializer.StringDecoder 2>/dev/null | cat -v | head > Dumping 003484332061.log > Starting offset: 3484332061 > offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006 > producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 81 > offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true > keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005 > producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: [] > endTxnMarker: COMMIT coordinatorEpoch: 84 > ... > -- > Streams is doing transaction commits per 100ms (commit.interval.ms=100 when > exactly-once) so the __consumer_offsets is growing really fast. > Is this (to keep all transactions) by design, or is that a bug for > LogCleaner? What would be the way to clean up the topic? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8313) KafkaStreams state not being updated properly after shutdown
[ https://issues.apache.org/jira/browse/KAFKA-8313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Guozhang Wang resolved KAFKA-8313. -- Resolution: Fixed Assignee: Guozhang Wang Fix Version/s: 2.3.0 > KafkaStreams state not being updated properly after shutdown > > > Key: KAFKA-8313 > URL: https://issues.apache.org/jira/browse/KAFKA-8313 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0 > Environment: Single broker running on Ubuntu Linux. >Reporter: Eric >Assignee: Guozhang Wang >Priority: Minor > Fix For: 2.3.0 > > Attachments: kafka-8313-src.tgz, log.txt > > > I am running a KafkaStreams inside a DropWizard server and I am trying to > detect when my stream shuts down (in case a non-recoverable error occurs). I > was hoping I could use KafkaStreams.setStateListener() to be notified when a > state change occurs. When I query the state, KafkaStreams is stuck in the > REBALANCING state even though its threads are all DEAD. > > You can easily reproduce this by doing the following: > # Create a topic (I have one with 5 partitions) > # Create a simple Kafka stream consuming from that topic > # Create a StateListener and register it on that KafkaStreams > # Start the Kafka stream > # Once everything runs, delete the topic using kafka-topics.sh > When deleting the topic, you will see the StreamThreads' state transition > from RUNNING to PARTITION_REVOKED and you will be notified with the > KafkaStreams REBALANCING state. That's all good and expected. Then the > StreamThreads transition to PENDING_SHUTDOWN and eventually to DEAD and the > KafkaStreams state is stuck into the REBALANCING thread. I was expecting to > see a NOT_RUNNING state eventually... am I right? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8313) KafkaStreams state not being updated properly after shutdown
[ https://issues.apache.org/jira/browse/KAFKA-8313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836801#comment-16836801 ] Guozhang Wang commented on KAFKA-8313: -- Hello [~ladoe00] Looking at your description I feel it is likely related to this issue https://issues.apache.org/jira/browse/KAFKA-8062 which is resolved in 2.2.1 / 2.3.0. Could you try out those versions by just build from the corresponding branch directly and see if it goes away? > KafkaStreams state not being updated properly after shutdown > > > Key: KAFKA-8313 > URL: https://issues.apache.org/jira/browse/KAFKA-8313 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0 > Environment: Single broker running on Ubuntu Linux. >Reporter: Eric >Priority: Minor > Attachments: kafka-8313-src.tgz, log.txt > > > I am running a KafkaStreams inside a DropWizard server and I am trying to > detect when my stream shuts down (in case a non-recoverable error occurs). I > was hoping I could use KafkaStreams.setStateListener() to be notified when a > state change occurs. When I query the state, KafkaStreams is stuck in the > REBALANCING state even though its threads are all DEAD. > > You can easily reproduce this by doing the following: > # Create a topic (I have one with 5 partitions) > # Create a simple Kafka stream consuming from that topic > # Create a StateListener and register it on that KafkaStreams > # Start the Kafka stream > # Once everything runs, delete the topic using kafka-topics.sh > When deleting the topic, you will see the StreamThreads' state transition > from RUNNING to PARTITION_REVOKED and you will be notified with the > KafkaStreams REBALANCING state. That's all good and expected. Then the > StreamThreads transition to PENDING_SHUTDOWN and eventually to DEAD and the > KafkaStreams state is stuck into the REBALANCING thread. I was expecting to > see a NOT_RUNNING state eventually... am I right? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8313) KafkaStreams state not being updated properly after shutdown
[ https://issues.apache.org/jira/browse/KAFKA-8313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836802#comment-16836802 ] Guozhang Wang commented on KAFKA-8313: -- Just a few more words on why the threads' being shutdown: during the rebalance which is triggered by the deletion of the topic, the leader would realize that the target source topic does not exist (any more), and hence it will propagate this error code to all the members telling them to shutdown, which is normal. The issue here though, as you already described, is that when threads are shutting down gracefully, the state listener was not being notified because of the issue above. I think it should have been fixed by now. I'm resolving this ticket now but in case you found it was not the case, please feel free to re-open. > KafkaStreams state not being updated properly after shutdown > > > Key: KAFKA-8313 > URL: https://issues.apache.org/jira/browse/KAFKA-8313 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0 > Environment: Single broker running on Ubuntu Linux. >Reporter: Eric >Priority: Minor > Attachments: kafka-8313-src.tgz, log.txt > > > I am running a KafkaStreams inside a DropWizard server and I am trying to > detect when my stream shuts down (in case a non-recoverable error occurs). I > was hoping I could use KafkaStreams.setStateListener() to be notified when a > state change occurs. When I query the state, KafkaStreams is stuck in the > REBALANCING state even though its threads are all DEAD. > > You can easily reproduce this by doing the following: > # Create a topic (I have one with 5 partitions) > # Create a simple Kafka stream consuming from that topic > # Create a StateListener and register it on that KafkaStreams > # Start the Kafka stream > # Once everything runs, delete the topic using kafka-topics.sh > When deleting the topic, you will see the StreamThreads' state transition > from RUNNING to PARTITION_REVOKED and you will be notified with the > KafkaStreams REBALANCING state. That's all good and expected. Then the > StreamThreads transition to PENDING_SHUTDOWN and eventually to DEAD and the > KafkaStreams state is stuck into the REBALANCING thread. I was expecting to > see a NOT_RUNNING state eventually... am I right? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8347) Choose next record to process by timestamp
Sophie Blee-Goldman created KAFKA-8347: -- Summary: Choose next record to process by timestamp Key: KAFKA-8347 URL: https://issues.apache.org/jira/browse/KAFKA-8347 Project: Kafka Issue Type: Improvement Components: streams Reporter: Sophie Blee-Goldman Currently PartitionGroup will determine the next record to process by choosing the partition with the lowest stream time. However if a partition contains out of order data its stream time may be significantly larger than the timestamp of the next record. The next record should instead be chosen as the record with the lowest timestamp across all partitions, regardless of which partition it comes from or what its partition time is. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures
Aishwarya Gune created KAFKA-8346: - Summary: Improve replica fetcher behavior in handling partition failures Key: KAFKA-8346 URL: https://issues.apache.org/jira/browse/KAFKA-8346 Project: Kafka Issue Type: Improvement Components: core Reporter: Aishwarya Gune The replica fetcher thread terminates in case one of the partitions being monitors fails. It leads to under-replicated partitions. The thread behavior can be improved by dropping that particular partition and continue handling rest of the partitions. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6789) Add retry logic in AdminClient requests
[ https://issues.apache.org/jira/browse/KAFKA-6789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-6789. Resolution: Fixed Fix Version/s: 2.2.2 2.1.2 2.0.2 > Add retry logic in AdminClient requests > --- > > Key: KAFKA-6789 > URL: https://issues.apache.org/jira/browse/KAFKA-6789 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: Manikumar >Priority: Major > Fix For: 2.0.2, 2.1.2, 2.2.2 > > > In KafkaAdminClient, today we treat all error codes as fatal and set the > exception accordingly in the returned futures. But for some error codes they > can be retried internally, for example, COORDINATOR_LOADING_IN_PROGRESS. We > could consider adding the retry logic internally in the admin client so that > users would not need to retry themselves. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics
[ https://issues.apache.org/jira/browse/KAFKA-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836770#comment-16836770 ] Guozhang Wang commented on KAFKA-8342: -- [~pkleindl] thanks for sharing your use case, I think we indeed need to think more about what's a pleasant procedure for users to use any admin tools to pre-create topics --- better not to manually export the topology by themselves. As for schemas, they should be properly registered at runtime by Streams' producer clients I think. > Admin tool to setup Kafka Stream topology (internal) topics > --- > > Key: KAFKA-8342 > URL: https://issues.apache.org/jira/browse/KAFKA-8342 > Project: Kafka > Issue Type: New Feature >Reporter: Boyang Chen >Priority: Major > Labels: newbie > > We have seen customers who need to deploy their application to production > environment but don't have access to create changelog and repartition topics. > They need to ask admin team to manually create those topics before proceeding > to start the actual stream job. We could add an admin tool to help them go > through the process quicker by providing a command that could > # Read through current stream topology > # Create corresponding topics as needed, even including output topics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8231) Expansion of ConnectClusterState interface
[ https://issues.apache.org/jira/browse/KAFKA-8231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Chris Egerton updated KAFKA-8231: - Fix Version/s: 2.3.0 > Expansion of ConnectClusterState interface > -- > > Key: KAFKA-8231 > URL: https://issues.apache.org/jira/browse/KAFKA-8231 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Reporter: Chris Egerton >Assignee: Chris Egerton >Priority: Major > Fix For: 2.3.0 > > > This covers [KIP-454: Expansion of the ConnectClusterState > interface|https://cwiki.apache.org/confluence/display/KAFKA/KIP-454%3A+Expansion+of+the+ConnectClusterState+interface] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8324) User constructed RocksObjects leak memory
[ https://issues.apache.org/jira/browse/KAFKA-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836755#comment-16836755 ] ASF GitHub Bot commented on KAFKA-8324: --- bbejeck commented on pull request #6697: KAFKA-8324: Add close() method to RocksDBConfigSetter URL: https://github.com/apache/kafka/pull/6697 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > User constructed RocksObjects leak memory > - > > Key: KAFKA-8324 > URL: https://issues.apache.org/jira/browse/KAFKA-8324 > Project: Kafka > Issue Type: Bug >Reporter: Sophie Blee-Goldman >Priority: Major > > Some of the RocksDB options a user can set when extending RocksDBConfigSetter > take Rocks objects as parameters. Many of these – including potentially large > objects like Cache and Filter – inherit from AbstractNativeReference and must > be closed explicitly in order to free the memory of the backing C++ object. > However the user has no way of closing any objects they have created in > RocksDBConfigSetter, and we do not ever close them for them. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6789) Add retry logic in AdminClient requests
[ https://issues.apache.org/jira/browse/KAFKA-6789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836746#comment-16836746 ] ASF GitHub Bot commented on KAFKA-6789: --- hachikuji commented on pull request #5578: KAFKA-6789: Handle COORDINATOR_LOAD_IN_PROGRESS, COORDINATOR_NOT_AVAILABLE retriable errors in AdminClient API URL: https://github.com/apache/kafka/pull/5578 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Add retry logic in AdminClient requests > --- > > Key: KAFKA-6789 > URL: https://issues.apache.org/jira/browse/KAFKA-6789 > Project: Kafka > Issue Type: Improvement > Components: admin >Reporter: Guozhang Wang >Assignee: Manikumar >Priority: Major > > In KafkaAdminClient, today we treat all error codes as fatal and set the > exception accordingly in the returned futures. But for some error codes they > can be retried internally, for example, COORDINATOR_LOADING_IN_PROGRESS. We > could consider adding the retry logic internally in the admin client so that > users would not need to retry themselves. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836735#comment-16836735 ] Andrew edited comment on KAFKA-8315 at 5/9/19 10:04 PM: Thanks [~ableegoldman] and [~vvcephei]. I have today tried to isolate the issue we are facing into a simple demo app. I have two streams of data x and y, x is high frequency (https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y is low frequency https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat. Both topics are preloaded into kafka here https://github.com/the4thamigo-uk/join-example/blob/master/create_topics.sh#L3 I set the join window sizes, and the grace here https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh#L9. There is logging as records are read off the streams via a transformer here https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54. There is logging on the join here https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93 What I see are joins only from here https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553 Also, I notice that all the x records are read first, then all the y records https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L451 and https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1452 Am I doing something wrong here? was (Author: the4thamigo_uk): Thanks [~ableegoldman] and [~vvcephei]. I have today tried to isolate the issue we are facing into a simple demo app. I have two streams of data x and y, x is high frequency (https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y is low frequency https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat. I set the join window sizes, and the grace here https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh#L9. There is logging as records are read off the streams via a transformer here https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54. There is logging on the join here https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93 What I see are joins only from here https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553 Also, I notice that all the x records are read first, then all the y records https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L451 and https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1452 Am I doing something wrong here? > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836735#comment-16836735 ] Andrew edited comment on KAFKA-8315 at 5/9/19 10:03 PM: Thanks [~ableegoldman] and [~vvcephei]. I have today tried to isolate the issue we are facing into a simple demo app. I have two streams of data x and y, x is high frequency (https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y is low frequency https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat. I set the join window sizes, and the grace here https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh#L9. There is logging as records are read off the streams via a transformer here https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54. There is logging on the join here https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93 What I see are joins only from here https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553 Also, I notice that all the x records are read first, then all the y records https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L451 and https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1452 Am I doing something wrong here? was (Author: the4thamigo_uk): Thanks [~ableegoldman] and [~vvcephei]. I have today tried to isolate the issue we are facing into a simple demo app. I have two streams of data x and y, x is high frequency (https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y is low frequency https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat. I set the join window sizes, and the grace here https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh#L9. There is logging as records are read off the streams via a transformer here https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54. There is logging on the join here https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93 What I see are joins only from here https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553 Am I doing something wrong here? > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836735#comment-16836735 ] Andrew edited comment on KAFKA-8315 at 5/9/19 9:59 PM: --- Thanks [~ableegoldman] and [~vvcephei]. I have today tried to isolate the issue we are facing into a simple demo app. I have two streams of data x and y, x is high frequency (https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y is low frequency https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat. I set the join window sizes, and the grace here https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh#L9. There is logging as records are read off the streams via a transformer here https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54. There is logging on the join here https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93 What I see are joins only from here https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553 Am I doing something wrong here? was (Author: the4thamigo_uk): Thanks [~ableegoldman] and [~vvcephei]. I have today tried to isolate the issue we are facing into a simple demo app. I have two streams of data x and y, x is high frequency (https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y is low frequency https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat. I set the join window sizes, and the grace here https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh. There is logging as records are read off the streams via a transformer here https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54. There is logging on the join here https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93 What I see are joins only from here https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553 Am I doing something wrong here? > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836735#comment-16836735 ] Andrew commented on KAFKA-8315: --- Thanks [~ableegoldman] and [~vvcephei]. I have today tried to isolate the issue we are facing into a simple demo app. I have two streams of data x and y, x is high frequency (https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y is low frequency https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat. I set the join window sizes, and the grace here https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh. There is logging as records are read off the streams via a transformer here https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54. There is logging on the join here https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93 What I see are joins only from here https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553 Am I doing something wrong here? > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836606#comment-16836606 ] Sophie Blee-Goldman commented on KAFKA-8315: [~the4thamigo_uk] The code you're looking for describing the logic of choosing the next record to process is in org.apache.kafka.streams.processor.internals.PartitionGroup, which contains a priority queue "nonEmptyQueuesByTime" that serves up the partition with the lowest timestamp when polled (unless max.idle.ms has passed as checked in StreamTask#isProcessable) > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8332) Regression in handling of JoinGroupRequest disallows deterministic protocol selection based on order of preference
[ https://issues.apache.org/jira/browse/KAFKA-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836596#comment-16836596 ] ASF GitHub Bot commented on KAFKA-8332: --- cmccabe commented on pull request #6626: KAFKA-8332: Preserve order of protocols when handling JOIN_GROUP requests URL: https://github.com/apache/kafka/pull/6626 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Regression in handling of JoinGroupRequest disallows deterministic protocol > selection based on order of preference > -- > > Key: KAFKA-8332 > URL: https://issues.apache.org/jira/browse/KAFKA-8332 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Blocker > Fix For: 2.3.0 > > > When a group of Kafka clients includes more than one embedded protocol in its > {{JoinGroupRequest}} along with its metadata, the group membership protocol > defines that the protocol which is supported by all the members of a group is > selected, and if more than one protocols are supported by all the members the > protocol is selected based on the order of preference as defined in the > {{JoinGroupRequest}}. > A recent change from type {{List}} to type {{Set}} for storing the set of > supported embedded protocols in the {{JoinGroupRequest}} combined with the > old type of handling with implicit types in the scala code, has introduced > non-determinism in the selection of the embedded protocol by the > {{GroupCoordinator}}, even though the underlying type of the Set in use is a > variant of LinkedHashSet (it respects order). > The relevant code is: > {code:java} > // KafkaApis.scala > val protocols = joinGroupRequest.data().protocols().asScala.map(protocol => > (protocol.name, protocol.metadata)).toList > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryanne Dolan reassigned KAFKA-7500: --- Assignee: Colin P. McCabe (was: Ryanne Dolan) Reviewer: Colin P. McCabe [https://github.com/apache/kafka/pull/6295] > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Reporter: Ryanne Dolan >Assignee: Colin P. McCabe >Priority: Minor > Fix For: 2.3.0 > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryanne Dolan updated KAFKA-7500: Fix Version/s: 2.3.0 > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Reporter: Ryanne Dolan >Assignee: Ryanne Dolan >Priority: Minor > Fix For: 2.3.0 > > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-8001) Fetch from future replica stalls when local replica becomes a leader
[ https://issues.apache.org/jira/browse/KAFKA-8001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Vikas Singh reassigned KAFKA-8001: -- Assignee: Vikas Singh (was: Colin Hicks) > Fetch from future replica stalls when local replica becomes a leader > > > Key: KAFKA-8001 > URL: https://issues.apache.org/jira/browse/KAFKA-8001 > Project: Kafka > Issue Type: Bug > Components: core >Affects Versions: 2.1.0, 2.1.1 >Reporter: Anna Povzner >Assignee: Vikas Singh >Priority: Critical > > With KIP-320, fetch from follower / future replica returns > FENCED_LEADER_EPOCH if current leader epoch in the request is lower than the > leader epoch known to the leader (or local replica in case of future replica > fetching). In case of future replica fetching from the local replica, if > local replica becomes the leader of the partition, the next fetch from future > replica fails with FENCED_LEADER_EPOCH and fetching from future replica is > stopped until the next leader change. > Proposed solution: on local replica leader change, future replica should > "become a follower" again, and go through the truncation phase. Or we could > optimize it, and just update partition state of the future replica to reflect > the updated current leader epoch. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ryanne Dolan updated KAFKA-7500: Description: Implement a drop-in replacement for MirrorMaker leveraging the Connect framework. [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] [https://github.com/apache/kafka/pull/6295] was: Implement a drop-in replacement for MirrorMaker leveraging the Connect framework. https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0 > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Reporter: Ryanne Dolan >Assignee: Ryanne Dolan >Priority: Minor > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0] > [https://github.com/apache/kafka/pull/6295] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8345) Create an Administrative API for Replica Reassignment
Colin P. McCabe created KAFKA-8345: -- Summary: Create an Administrative API for Replica Reassignment Key: KAFKA-8345 URL: https://issues.apache.org/jira/browse/KAFKA-8345 Project: Kafka Issue Type: Improvement Reporter: Colin P. McCabe Assignee: Colin P. McCabe Create an Administrative API for Replica Reassignment, as discussed in KIP-455 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)
[ https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836058#comment-16836058 ] Ryanne Dolan edited comment on KAFKA-7500 at 5/9/19 2:41 PM: - [~cloudfrog], glad you are taking a look. I'm looking forward to hearing about your experience. > Is this expected to work with kafka 1.1.1 clusters? Yes, I believe it will work with 0.11.0.0 or higher, but maybe you can test it to verify :) > will prefix remote topic names ... be configurable? Yes, you can implement your own ReplicationPolicy to define remote topics however you like: {code:java} replication.policy.class = my.SuffixReplicationPolicy {code} Also, MM2 doesn't care how existing source topics are named. If your topics are suffixed with their local DC (a common pattern), you can leave them as-is without breaking anything. By default you'd get topics like "dc1.topic1-dc1", so you might consider implementing a ReplicationPolicy that strips the suffix during replication so you get just "dc1.topic1". Ryanne was (Author: ryannedolan): [~cloudfrog], glad you are taking a look. I'm looking forward to hearing about your experience. > Is this expected to work with kafka 1.1.1 clusters? Yes, I believe it will work with 0.11.0.0 or higher, but maybe you can test it to verify :) > will prefix remote topic names ... be configurable? Yes, you can implement your own ReplicationPolicy to define remote topics however you like: {code:java} replication.policy.class = my.SuffixReplicationPolicy {code} Also, MM2 doesn't care how existing source topics are named. If your topics are prefixed with their local DC (a common pattern), you can leave them as-is without breaking anything. By default you'd get topics like "dc1.topic1-dc1", so you might consider implementing a ReplicationPolicy that strips the suffix during replication so you get just "dc1.topic1". Ryanne > MirrorMaker 2.0 (KIP-382) > - > > Key: KAFKA-7500 > URL: https://issues.apache.org/jira/browse/KAFKA-7500 > Project: Kafka > Issue Type: New Feature > Components: KafkaConnect, mirrormaker >Reporter: Ryanne Dolan >Assignee: Ryanne Dolan >Priority: Minor > > Implement a drop-in replacement for MirrorMaker leveraging the Connect > framework. > https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7849) Warning when adding GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-7849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836432#comment-16836432 ] Frederic Tardif commented on KAFKA-7849: got the same issue: {code:java} [2019-05-08 09:54:24.212] WARN [uepg-catalog_program-snapshot-connector-7ddde56e-7dff-4a14-aa11-6c8fb22b40ce-StreamThread-2] [Consumer clientId=uepg-catalog_program-snapshot-connector-7ddde56e-7dff-4a14-aa11-6c8fb22b40ce-StreamThread-2-consumer, groupId=uepg-catalog_program-snapshot-connector] The following subscribed topics are not assigned to any members: [livecms.program.snapshot] - (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:?) {code} My topology is a globalTable: {code:java} streamBuilder.globalTable(config.getTopicName(), Materialized.as(config.getStoreName()));{code} {quote}I confirm my consumer lag in the rocksdb checkpoint file did indeed show the expected partition offset. In addition the warning disappeared the third time I had restarted it (maybe however caused by the store already in sync). I would be surprised in that case that fiddling with the number of stream thread configuration is required. So it looks like a self healed kind of warning. The message however does not sound that reassuring... Next time I'll get this I'll try enabling debug logs to correlate the warning. {quote} > Warning when adding GlobalKTable > > > Key: KAFKA-7849 > URL: https://issues.apache.org/jira/browse/KAFKA-7849 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.1.0 >Reporter: Dmitry Minkovsky >Priority: Minor > Labels: newbie > > Per > https://lists.apache.org/thread.html/59c119be8a2723c501e0653fa3ed571e8c09be40d5b5170c151528b5@%3Cusers.kafka.apache.org%3E > > When I add a GlobalKTable for topic "message-write-service-user-ids-by-email" > to my topology, I get this warning: > > [2019-01-19 12:18:14,008] WARN > (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:421) > [Consumer > clientId=message-write-service-55f2ca4d-0efc-4344-90d3-955f9f5a65fd-StreamThread-2-consumer, > groupId=message-write-service] The following subscribed topics are not > assigned to any members: [message-write-service-user-ids-by-email] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7805) Use --bootstrap-server option for kafka-topics.sh in ducktape tests where applicable
[ https://issues.apache.org/jira/browse/KAFKA-7805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Viktor Somogyi-Vass updated KAFKA-7805: --- Summary: Use --bootstrap-server option for kafka-topics.sh in ducktape tests where applicable (was: Use --bootstrap-server option in ducktape tests where applicable) > Use --bootstrap-server option for kafka-topics.sh in ducktape tests where > applicable > > > Key: KAFKA-7805 > URL: https://issues.apache.org/jira/browse/KAFKA-7805 > Project: Kafka > Issue Type: Improvement > Components: system tests >Reporter: Viktor Somogyi-Vass >Priority: Major > > KIP-377 introduces the {{--bootstrap-server}} option and deprecates the > {{--zookeeper}} option in {{kafka-topics.sh}}. I'd be nice to use the new > option in the ducktape tests to gain higher test coverage. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-8240) Source.equals() can fail with NPE
[ https://issues.apache.org/jira/browse/KAFKA-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-8240. Resolution: Fixed Fix Version/s: 2.2.2 2.1.2 2.3.0 [~vahid] I mark this as fix version 2.2.2 – if we roll a new RC for the bug-fix release, please update this ticket to 2.2.1. Thanks. > Source.equals() can fail with NPE > - > > Key: KAFKA-8240 > URL: https://issues.apache.org/jira/browse/KAFKA-8240 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0, 2.1.1 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Major > Labels: beginner, easy-fix, newbie > Fix For: 2.3.0, 2.1.2, 2.2.2 > > > Reported on an PR: > [https://github.com/apache/kafka/pull/5284/files/1df6208f48b6b72091fea71323d94a16102ffd13#r270607795] > InternalTopologyBuilder#Source.equals() might fail with NPE if > `topicPattern==null`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8240) Source.equals() can fail with NPE
[ https://issues.apache.org/jira/browse/KAFKA-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836397#comment-16836397 ] ASF GitHub Bot commented on KAFKA-8240: --- mjsax commented on pull request #6685: KAFKA-8240: Fix NPE in Source.equals() URL: https://github.com/apache/kafka/pull/6685 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Source.equals() can fail with NPE > - > > Key: KAFKA-8240 > URL: https://issues.apache.org/jira/browse/KAFKA-8240 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.2.0, 2.1.1 >Reporter: Matthias J. Sax >Assignee: Matthias J. Sax >Priority: Major > Labels: beginner, easy-fix, newbie > > Reported on an PR: > [https://github.com/apache/kafka/pull/5284/files/1df6208f48b6b72091fea71323d94a16102ffd13#r270607795] > InternalTopologyBuilder#Source.equals() might fail with NPE if > `topicPattern==null`. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8311) Better consumer timeout exception handling
[ https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836381#comment-16836381 ] Matthias J. Sax commented on KAFKA-8311: Thanks for you interest [~clearpal7]! I added you to the list of contributors and assigned the ticket to you. You can now also self-assign tickets. As Boyang mentioned, you should also subscribe to the dev-mailing list: [https://kafka.apache.org/contact] Additionally, please read the wiki: [https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes] (just as a starting point – there are other pager that provide useful information to get started). > Better consumer timeout exception handling > --- > > Key: KAFKA-8311 > URL: https://issues.apache.org/jira/browse/KAFKA-8311 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: WooYoung >Priority: Major > Labels: newbie > > When stream application crashed due to underlying consumer commit timeout, we > have seen following gaps: > 1. The current timeout exception doesn't provide meaningful tuning > instructions. We should augment the error message to let user change > `default.api.timeout.ms` in order to tolerate longer reaction time. > 2. Currently we have 3 different types of consumers on KStream: > thread-consumer, global-consumer and restore-consumer. Although we don't plan > to explicitly handle this consumer timeout on stream level, we could wrap it > with more meaningful message either on consumer or stream level to let user > be aware which consumer is having trouble. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-8311) Better consumer timeout exception handling
[ https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax reassigned KAFKA-8311: -- Assignee: WooYoung > Better consumer timeout exception handling > --- > > Key: KAFKA-8311 > URL: https://issues.apache.org/jira/browse/KAFKA-8311 > Project: Kafka > Issue Type: Improvement > Components: consumer, streams >Reporter: Boyang Chen >Assignee: WooYoung >Priority: Major > Labels: newbie > > When stream application crashed due to underlying consumer commit timeout, we > have seen following gaps: > 1. The current timeout exception doesn't provide meaningful tuning > instructions. We should augment the error message to let user change > `default.api.timeout.ms` in order to tolerate longer reaction time. > 2. Currently we have 3 different types of consumers on KStream: > thread-consumer, global-consumer and restore-consumer. Although we don't plan > to explicitly handle this consumer timeout on stream level, we could wrap it > with more meaningful message either on consumer or stream level to let user > be aware which consumer is having trouble. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-8332) Regression in handling of JoinGroupRequest disallows deterministic protocol selection based on order of preference
[ https://issues.apache.org/jira/browse/KAFKA-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ismael Juma updated KAFKA-8332: --- Fix Version/s: 2.3.0 > Regression in handling of JoinGroupRequest disallows deterministic protocol > selection based on order of preference > -- > > Key: KAFKA-8332 > URL: https://issues.apache.org/jira/browse/KAFKA-8332 > Project: Kafka > Issue Type: Bug > Components: core >Reporter: Konstantine Karantasis >Assignee: Konstantine Karantasis >Priority: Blocker > Fix For: 2.3.0 > > > When a group of Kafka clients includes more than one embedded protocol in its > {{JoinGroupRequest}} along with its metadata, the group membership protocol > defines that the protocol which is supported by all the members of a group is > selected, and if more than one protocols are supported by all the members the > protocol is selected based on the order of preference as defined in the > {{JoinGroupRequest}}. > A recent change from type {{List}} to type {{Set}} for storing the set of > supported embedded protocols in the {{JoinGroupRequest}} combined with the > old type of handling with implicit types in the scala code, has introduced > non-determinism in the selection of the embedded protocol by the > {{GroupCoordinator}}, even though the underlying type of the Set in use is a > variant of LinkedHashSet (it respects order). > The relevant code is: > {code:java} > // KafkaApis.scala > val protocols = joinGroupRequest.data().protocols().asScala.map(protocol => > (protocol.name, protocol.metadata)).toList > {code} > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time
[ https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836374#comment-16836374 ] Raoul de Haard commented on KAFKA-1194: --- Any updates on when this issue is being merged? (see:[https://github.com/apache/kafka/pull/6329#issuecomment-490901117] ) > The kafka broker cannot delete the old log files after the configured time > -- > > Key: KAFKA-1194 > URL: https://issues.apache.org/jira/browse/KAFKA-1194 > Project: Kafka > Issue Type: Bug > Components: log >Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0 > Environment: window >Reporter: Tao Qin >Priority: Critical > Labels: features, patch, windows > Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, > Untitled.jpg, image-2018-09-12-14-25-52-632.png, > image-2018-11-26-10-18-59-381.png, kafka-1194-v1.patch, kafka-1194-v2.patch, > kafka-bombarder.7z, screenshot-1.png > > Original Estimate: 72h > Remaining Estimate: 72h > > We tested it in windows environment, and set the log.retention.hours to 24 > hours. > # The minimum age of a log file to be eligible for deletion > log.retention.hours=24 > After several days, the kafka broker still cannot delete the old log file. > And we get the following exceptions: > [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task > 'kafka-log-retention' (kafka.utils.KafkaScheduler) > kafka.common.KafkaStorageException: Failed to change the log file suffix from > to .deleted for log segment 1516723 > at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249) > at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638) > at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418) > at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418) > at > scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59) > at scala.collection.immutable.List.foreach(List.scala:76) > at kafka.log.Log.deleteOldSegments(Log.scala:418) > at > kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284) > at > kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316) > at > kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314) > at > scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743) > at scala.collection.Iterator$class.foreach(Iterator.scala:772) > at > scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573) > at scala.collection.IterableLike$class.foreach(IterableLike.scala:73) > at > scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615) > at > scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742) > at kafka.log.LogManager.cleanupLogs(LogManager.scala:314) > at > kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143) > at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100) > at > java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471) > at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178) > at > java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293) > at > java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) > at > java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) > at java.lang.Thread.run(Thread.java:724) > I think this error happens because kafka tries to rename the log file when it > is still opened. So we should close the file first before rename. > The index file uses a special data structure, the MappedByteBuffer. Javadoc > describes it as: > A mapped byte buffer and the file mapping that it represents remain valid > until the buffer itself is garbage-collected. > Fortunately, I find a forceUnmap function in kafka code, and perhaps it can > be used to free the MappedByteBuffer. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8343) streams application crashed due to rocksdb
[ https://issues.apache.org/jira/browse/KAFKA-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836347#comment-16836347 ] Matthias J. Sax commented on KAFKA-8343: I don't think that the code is executed in a different order. However, the may still be a race condition... > streams application crashed due to rocksdb > -- > > Key: KAFKA-8343 > URL: https://issues.apache.org/jira/browse/KAFKA-8343 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0 > Environment: centos 7 jdk8 kafka-streams1.0 >Reporter: gaoshu >Priority: Major > Attachments: fullsizeoutput_6.jpeg > > > my streams application always crashed in few days. The crash log looks like > [https://github.com/facebook/rocksdb/issues/5234|[https://github.com/facebook/rocksdb/issues/5234].] > so I think it may because of RocksDBStore.java closed incorrectly in > multithread. I look through the below code, it means the db.close() should > after openiterators.close(). However, db.close() may be executed before > iterators.close() due to instructions reorder. I hope my guess is correct. > {code:java} > // RocksDBStore.java > @Override > public synchronized void close() { > if (!open) { > return; > } > open = false; > closeOpenIterators(); > options.close(); > wOptions.close(); > fOptions.close(); > db.close(); > options = null; > wOptions = null; > fOptions = null; > db = null; > } > {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically
[ https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836320#comment-16836320 ] Ferlin-Sutton Léo commented on KAFKA-7362: -- I can see the implementation is being worked on and discussed. In the meantime is there a recommended way to manually delete an orphan partition ? We were thinking of : # Stopping the Kafka process on the concerned broker # `rm -rf` the directory containing the orphaned partition # Turning the kafka broker back on > enable kafka broker to remove orphan partitions automatically > -- > > Key: KAFKA-7362 > URL: https://issues.apache.org/jira/browse/KAFKA-7362 > Project: Kafka > Issue Type: Improvement > Components: core, log >Reporter: xiongqi wu >Assignee: xiongqi wu >Priority: Major > > When partition reassignment removes topic partitions from a offline broker, > those removed partitions become orphan partitions to the broker. When the > offline broker comes back online, it is not able to clean up both data and > folders that belong to orphan partitions. Log manager will scan all all dirs > during startup, but the time based retention policy on a topic partition will > not be kicked out until the broker is either a follower or a leader of the > partition. In addition, we do not have logic to delete folders that belong > to orphan partition today. > Open this ticket to provide a mechanism (when enabled) to safely remove > orphan partitions automatically. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836278#comment-16836278 ] Andrew edited comment on KAFKA-8315 at 5/9/19 10:58 AM: [~vvcephei] Thanks again for your help and advice. Ok, so it should work as I had previously hoped then. So maybe our working hypothesis is incorrect? As you suggest, I am currently re-running a test using the following code to determine the lag between the left and right topics. The reason we think it might be due to the right stream getting ahead is that this also helps to explain why we manage to perform some initial joins at the start of the ingestion period for about two months (while the streams are presumed to be in line), then nothing for most of the middle period, then a few days of joins at the end. As for the retention period, I think I understand that now from all your explanations. The problem here is not retention, it looks like it is something to do with either the specifics of our dataset or the way in which the streams are read. We delved into the code, and found the way that the RocksDB code works and think we understand it now. What I didnt manage to find is where the code is for the logic you describe in your first paragraph ('Streams should choose to ...etc'). {Code} private static Topology joinTestStreamStream(final JoinerProperties props) { final StreamsBuilder builder = new StreamsBuilder(); final TransformerSupplier streamLogger = () -> new Transformer>() { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public KeyValue transform(Object key, GenericRecord value) { log.info(String.format("reading : topic=%s, partition=%d, timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), context.timestamp(), context.offset(), key)); return new KeyValue<>(key,value); } @Override public void close() { } }; final KStream leftStream = builder.stream(props.leftTopic().getName()).transform(streamLogger); final KStream rightStream = builder.stream(props.rightTopic().getName()).transform(streamLogger); // setup the join final JoinWindows joinWindow = JoinWindows.of(Duration.ZERO).before(Duration.parse("P2D")).grace(Duration.parse("P7D")); final KStream joinStream = leftStream.join(rightStream, (l, r) -> { log.info("joining: " + l + ", " + r); return null; } , joinWindow); return builder.build(); } {Code} was (Author: the4thamigo_uk): Ok, so it should work as I had previously hoped then. So maybe our working hypothesis is incorrect? As you suggest, I am currently re-running a test using the following code to determine the lag between the left and right topics. The reason we think it might be due to the right stream getting ahead is that this also helps to explain why we manage to perform some initial joins at the start of the ingestion period for about two months (while the streams are presumed to be in line), then nothing for most of the middle period, then a few days of joins at the end. As for the retention period, I think I understand that now from all your explanations. The problem here is not retention, it looks like it is something to do with either the specifics of our dataset or the way in which the streams are read. We delved into the code, and found the way that the RocksDB code works and think we understand it now. What I didnt manage to find is where the code is for the logic you describe in your first paragraph ('Streams should choose to ...etc'). {Code} private static Topology joinTestStreamStream(final JoinerProperties props) { final StreamsBuilder builder = new StreamsBuilder(); final TransformerSupplier streamLogger = () -> new Transformer>() { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public KeyValue transform(Object key, GenericRecord value) { log.info(String.format("reading : topic=%s, partition=%d, timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), context.timestamp(), context.offset(), key)); return new KeyValue<>(key,value); } @Override public void close() { } }; final KStream leftStream = builder.stream(props.leftTopic().getName()).transform(streamLogger); final KStream rightStream =
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836278#comment-16836278 ] Andrew edited comment on KAFKA-8315 at 5/9/19 10:57 AM: Ok, so it should work as I had previously hoped then. So maybe our working hypothesis is incorrect? As you suggest, I am currently re-running a test using the following code to determine the lag between the left and right topics. The reason we think it might be due to the right stream getting ahead is that this also helps to explain why we manage to perform some initial joins at the start of the ingestion period for about two months (while the streams are presumed to be in line), then nothing for most of the middle period, then a few days of joins at the end. As for the retention period, I think I understand that now from all your explanations. The problem here is not retention, it looks like it is something to do with either the specifics of our dataset or the way in which the streams are read. We delved into the code, and found the way that the RocksDB code works and think we understand it now. What I didnt manage to find is where the code is for the logic you describe in your first paragraph ('Streams should choose to ...etc'). {Code} private static Topology joinTestStreamStream(final JoinerProperties props) { final StreamsBuilder builder = new StreamsBuilder(); final TransformerSupplier streamLogger = () -> new Transformer>() { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public KeyValue transform(Object key, GenericRecord value) { log.info(String.format("reading : topic=%s, partition=%d, timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), context.timestamp(), context.offset(), key)); return new KeyValue<>(key,value); } @Override public void close() { } }; final KStream leftStream = builder.stream(props.leftTopic().getName()).transform(streamLogger); final KStream rightStream = builder.stream(props.rightTopic().getName()).transform(streamLogger); // setup the join final JoinWindows joinWindow = JoinWindows.of(Duration.ZERO).before(Duration.parse("P2D")).grace(Duration.parse("P7D")); final KStream joinStream = leftStream.join(rightStream, (l, r) -> { log.info("joining: " + l + ", " + r); return null; } , joinWindow); return builder.build(); } {code:java} {code} was (Author: the4thamigo_uk): Ok, so it should work as I had previously hoped then. So maybe our working hypothesis is incorrect? As you suggest, I am currently re-running a test using the following code to determine the lag between the left and right topics. The reason we think it might be due to the right stream getting ahead is that this also helps to explain why we manage to perform some initial joins at the start of the ingestion period for about two months (while the streams are presumed to be in line), then nothing for most of the middle period, then a few days of joins at the end. As for the retention period, I think I understand that now from all your explanations. The problem here is not retention, it looks like it is something to do with either the specifics of our dataset or the way in which the streams are read. We delved into the code, and found the way that the RocksDB code works and think we understand it now. What I didnt manage to find is where the code is for the logic you describe in your first paragraph ('Streams should choose to ...etc'). \{Code} private static Topology joinTestStreamStream(final JoinerProperties props) { final StreamsBuilder builder = new StreamsBuilder(); final TransformerSupplier streamLogger = () -> new Transformer>() { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public KeyValue transform(Object key, GenericRecord value) { log.info(String.format("reading : topic=%s, partition=%d, timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), context.timestamp(), context.offset(), key)); return new KeyValue<>(key,value); } @Override public void close() { } }; final KStream leftStream = builder.stream(props.leftTopic().getName()).transform(streamLogger); final KStream rightStream = builder.stream(props.rightTopic().getName()).tra
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836278#comment-16836278 ] Andrew edited comment on KAFKA-8315 at 5/9/19 10:57 AM: Ok, so it should work as I had previously hoped then. So maybe our working hypothesis is incorrect? As you suggest, I am currently re-running a test using the following code to determine the lag between the left and right topics. The reason we think it might be due to the right stream getting ahead is that this also helps to explain why we manage to perform some initial joins at the start of the ingestion period for about two months (while the streams are presumed to be in line), then nothing for most of the middle period, then a few days of joins at the end. As for the retention period, I think I understand that now from all your explanations. The problem here is not retention, it looks like it is something to do with either the specifics of our dataset or the way in which the streams are read. We delved into the code, and found the way that the RocksDB code works and think we understand it now. What I didnt manage to find is where the code is for the logic you describe in your first paragraph ('Streams should choose to ...etc'). {Code} private static Topology joinTestStreamStream(final JoinerProperties props) { final StreamsBuilder builder = new StreamsBuilder(); final TransformerSupplier streamLogger = () -> new Transformer>() { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public KeyValue transform(Object key, GenericRecord value) { log.info(String.format("reading : topic=%s, partition=%d, timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), context.timestamp(), context.offset(), key)); return new KeyValue<>(key,value); } @Override public void close() { } }; final KStream leftStream = builder.stream(props.leftTopic().getName()).transform(streamLogger); final KStream rightStream = builder.stream(props.rightTopic().getName()).transform(streamLogger); // setup the join final JoinWindows joinWindow = JoinWindows.of(Duration.ZERO).before(Duration.parse("P2D")).grace(Duration.parse("P7D")); final KStream joinStream = leftStream.join(rightStream, (l, r) -> { log.info("joining: " + l + ", " + r); return null; } , joinWindow); return builder.build(); } {Code} was (Author: the4thamigo_uk): Ok, so it should work as I had previously hoped then. So maybe our working hypothesis is incorrect? As you suggest, I am currently re-running a test using the following code to determine the lag between the left and right topics. The reason we think it might be due to the right stream getting ahead is that this also helps to explain why we manage to perform some initial joins at the start of the ingestion period for about two months (while the streams are presumed to be in line), then nothing for most of the middle period, then a few days of joins at the end. As for the retention period, I think I understand that now from all your explanations. The problem here is not retention, it looks like it is something to do with either the specifics of our dataset or the way in which the streams are read. We delved into the code, and found the way that the RocksDB code works and think we understand it now. What I didnt manage to find is where the code is for the logic you describe in your first paragraph ('Streams should choose to ...etc'). {Code} private static Topology joinTestStreamStream(final JoinerProperties props) { final StreamsBuilder builder = new StreamsBuilder(); final TransformerSupplier streamLogger = () -> new Transformer>() { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public KeyValue transform(Object key, GenericRecord value) { log.info(String.format("reading : topic=%s, partition=%d, timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), context.timestamp(), context.offset(), key)); return new KeyValue<>(key,value); } @Override public void close() { } }; final KStream leftStream = builder.stream(props.leftTopic().getName()).transform(streamLogger); final KStream rightStream = builder.stream(props.rightTopic().getName()).transfo
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836278#comment-16836278 ] Andrew edited comment on KAFKA-8315 at 5/9/19 10:56 AM: Ok, so it should work as I had previously hoped then. So maybe our working hypothesis is incorrect? As you suggest, I am currently re-running a test using the following code to determine the lag between the left and right topics. The reason we think it might be due to the right stream getting ahead is that this also helps to explain why we manage to perform some initial joins at the start of the ingestion period for about two months (while the streams are presumed to be in line), then nothing for most of the middle period, then a few days of joins at the end. As for the retention period, I think I understand that now from all your explanations. The problem here is not retention, it looks like it is something to do with either the specifics of our dataset or the way in which the streams are read. We delved into the code, and found the way that the RocksDB code works and think we understand it now. What I didnt manage to find is where the code is for the logic you describe in your first paragraph ('Streams should choose to ...etc'). \{Code} private static Topology joinTestStreamStream(final JoinerProperties props) { final StreamsBuilder builder = new StreamsBuilder(); final TransformerSupplier streamLogger = () -> new Transformer>() { private ProcessorContext context; @Override public void init(ProcessorContext context) { this.context = context; } @Override public KeyValue transform(Object key, GenericRecord value) { log.info(String.format("reading : topic=%s, partition=%d, timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), context.timestamp(), context.offset(), key)); return new KeyValue<>(key,value); } @Override public void close() { } }; final KStream leftStream = builder.stream(props.leftTopic().getName()).transform(streamLogger); final KStream rightStream = builder.stream(props.rightTopic().getName()).transform(streamLogger); // setup the join final JoinWindows joinWindow = JoinWindows.of(Duration.ZERO).before(Duration.parse("P2D")).grace(Duration.parse("P7D")); final KStream joinStream = leftStream.join(rightStream, (l, r) -> { log.info("joining: " + l + ", " + r); return null; }, joinWindow); return builder.build(); } {Code} was (Author: the4thamigo_uk): Ok, so it should work as I had previously hoped then. So maybe our working hypothesis is incorrect? As you suggest, I am currently re-running a test using the following code to determine the lag between the left and right topics. The reason we think it might be due to the right stream getting ahead is that this also helps to explain why we manage to perform some initial joins at the start of the ingestion period for about two months (while the streams are presumed to be in line), then nothing for most of the middle period, then a few days of joins at the end. As for the retention period, I think I understand that now from all your explanations. The problem here is not retention, it looks like it is something to do with either the specifics of our dataset or the way in which the streams are read. We delved into the code, and found the way that the RocksDB code works and think we understand it now. What I didnt manage to find is where the code is for the logic you describe in your first paragraph ('Streams should choose to ...etc'). > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace
[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836278#comment-16836278 ] Andrew edited comment on KAFKA-8315 at 5/9/19 10:43 AM: Ok, so it should work as I had previously hoped then. So maybe our working hypothesis is incorrect? As you suggest, I am currently re-running a test using the following code to determine the lag between the left and right topics. The reason we think it might be due to the right stream getting ahead is that this also helps to explain why we manage to perform some initial joins at the start of the ingestion period for about two months (while the streams are presumed to be in line), then nothing for most of the middle period, then a few days of joins at the end. As for the retention period, I think I understand that now from all your explanations. The problem here is not retention, it looks like it is something to do with either the specifics of our dataset or the way in which the streams are read. We delved into the code, and found the way that the RocksDB code works and think we understand it now. What I didnt manage to find is where the code is for the logic you describe in your first paragraph ('Streams should choose to ...etc'). was (Author: the4thamigo_uk): Ok, so it should work as I had previously hoped, so maybe our working hypothesis is incorrect then. As you suggest, I am currently re-running a test using the following code to determine the lag between the left and right topics. The reason we think it might be due to the right stream getting ahead is that this also helps to explain why we manage to perform some initial joins at the start of the ingestion period for about two months (while the streams are presumed to be in line), then nothing for most of the middle period, then a few days of joins at the end. As for the retention period, I think I understand that now from all your explanations. The problem here is not retention, it looks like it is something to do with either the specifics of our dataset or the way in which the streams are read. We delved into the code, and found the way that the RocksDB code works and think we understand it now. What I didnt manage to find is where the code is for the logic you describe in your first paragraph ('Streams should choose to ...etc'). > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace
[ https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836278#comment-16836278 ] Andrew commented on KAFKA-8315: --- Ok, so it should work as I had previously hoped, so maybe our working hypothesis is incorrect then. As you suggest, I am currently re-running a test using the following code to determine the lag between the left and right topics. The reason we think it might be due to the right stream getting ahead is that this also helps to explain why we manage to perform some initial joins at the start of the ingestion period for about two months (while the streams are presumed to be in line), then nothing for most of the middle period, then a few days of joins at the end. As for the retention period, I think I understand that now from all your explanations. The problem here is not retention, it looks like it is something to do with either the specifics of our dataset or the way in which the streams are read. We delved into the code, and found the way that the RocksDB code works and think we understand it now. What I didnt manage to find is where the code is for the logic you describe in your first paragraph ('Streams should choose to ...etc'). > Cannot pass Materialized into a join operation - hence cant set retention > period independent of grace > - > > Key: KAFKA-8315 > URL: https://issues.apache.org/jira/browse/KAFKA-8315 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Andrew >Assignee: John Roesler >Priority: Major > Attachments: code.java > > > The documentation says to use `Materialized` not `JoinWindows.until()` > ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]), > but there is no where to pass a `Materialized` instance to the join > operation, only to the group operation is supported it seems. > > Slack conversation here : > [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300] > [Additional] > From what I understand, the retention period should be independent of the > grace period, so I think this is more than a documentation fix (see comments > below) -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics
[ https://issues.apache.org/jira/browse/KAFKA-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836232#comment-16836232 ] Patrik Kleindl commented on KAFKA-8342: --- This sounds very interesting, I hope it is ok if I add our use case and some thoughts: Our current goal was to minimize deployment order dependencies so we don't have to mind deploying an application (mostly streams, but normal producers/consumers too) and have all relevant topics created before startup. The internal topics are handled by Kafka Streams itself, but as we are moving to a more restricted approach we will be in the situation described above. Our solution was to add the definition of topics for each application as a configuration artifact and share the dependencies via maven. On startup (or triggered via a command-line call) this configuration is parsed and the topics are set up as needed. We do this for all "named" topics including input and output topics and could add the internal topics if needed. If I understand the approach above correctly this would mean that a user would have to export the topology (either the toString() version or something more beautiful) and pass it to the tool for processing. One thing I would like to add to the use case is the interaction with the schema registry, as we are using Avro the creation of the schemas for the internal topics and a necessary pain too as I expect this to be restricted in a similar way as topic creation. > Admin tool to setup Kafka Stream topology (internal) topics > --- > > Key: KAFKA-8342 > URL: https://issues.apache.org/jira/browse/KAFKA-8342 > Project: Kafka > Issue Type: New Feature >Reporter: Boyang Chen >Priority: Major > Labels: newbie > > We have seen customers who need to deploy their application to production > environment but don't have access to create changelog and repartition topics. > They need to ask admin team to manually create those topics before proceeding > to start the actual stream job. We could add an admin tool to help them go > through the process quicker by providing a command that could > # Read through current stream topology > # Create corresponding topics as needed, even including output topics. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8146) WARNING: An illegal reflective access operation has occurred
[ https://issues.apache.org/jira/browse/KAFKA-8146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836193#comment-16836193 ] Adam Antal commented on KAFKA-8146: --- Hi, This error is the same as in HADOOP-10848. This is due to JDK9+ blocking access to sun.* classes. Can be resolved with a patch in ZK side by getting rid of the reflective call. > WARNING: An illegal reflective access operation has occurred > > > Key: KAFKA-8146 > URL: https://issues.apache.org/jira/browse/KAFKA-8146 > Project: Kafka > Issue Type: Bug > Components: clients, core >Affects Versions: 2.1.1 > Environment: Java 11 > Kafka v2.1.1 >Reporter: Abhi >Priority: Major > > Hi, > I am running Kafka v2.1.1 and see below warnings at the startup of server and > clients. What is the cause of these warnings and how they can be avoided or > fixed? > *Client side:* > WARNING: Illegal reflective access by > org.apache.kafka.common.network.SaslChannelBuilder > (file:/local/kafka/kafka_installation/kafka_2.12-2.1.1/libs/kafka-clients-2.1.1.jar) > to method sun.security.krb5.Config.getInstance() > WARNING: Please consider reporting this to the maintainers of > org.apache.kafka.common.network.SaslChannelBuilder > WARNING: Use --illegal-access=warn to enable warnings of further illegal > reflective access operations > WARNING: All illegal access operations will be denied in a future release > *Server side:* > WARNING: An illegal reflective access operation has occurred > WARNING: Illegal reflective access by > org.apache.zookeeper.server.util.KerberosUtil > (file:/local/kafka/kafka_installation/kafka_2.12-2.1.1/libs/zookeeper-3.4.13.jar) > to method sun.security.krb5.Config.getInstance() > WARNING: Please consider reporting this to the maintainers of > org.apache.zookeeper.server.util.KerberosUtil > WARNING: Use --illegal-access=warn to enable warnings of further illegal > reflective access operations > WARNING: All illegal access operations will be denied in a future release -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-8344) Fix vagrant-up.sh to work with AWS properly
[ https://issues.apache.org/jira/browse/KAFKA-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836176#comment-16836176 ] ASF GitHub Bot commented on KAFKA-8344: --- sekikn commented on pull request #6703: KAFKA-8344. Fix vagrant-up.sh to work with AWS properly URL: https://github.com/apache/kafka/pull/6703 For now, `vagrant/vagrant-up.sh --aws` fails because the `vagrant hostmanager` command in that script lacks the `--aws` option. This PR adds it. I ran `vagrant/vagrant-up.sh --aws` with and without `--no-parallel` option and confirmed both worked as expected. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Fix vagrant-up.sh to work with AWS properly > --- > > Key: KAFKA-8344 > URL: https://issues.apache.org/jira/browse/KAFKA-8344 > Project: Kafka > Issue Type: Bug >Reporter: Kengo Seki >Assignee: Kengo Seki >Priority: Major > > I tried to run {{vagrant/vagrant-up.sh --aws}} with the following > Vagrantfile.local. > {code} > enable_dns = true > enable_hostmanager = false > # EC2 > ec2_access_key = "" > ec2_secret_key = "" > ec2_keypair_name = "keypair" > ec2_keypair_file = "/path/to/keypair/file" > ec2_region = "ap-northeast-1" > ec2_ami = "ami-0905ffddadbfd01b7" > ec2_security_groups = "sg-" > ec2_subnet_id = "subnet-" > {code} > EC2 instances were successfully created, but it failed with the following > error after that. > {code} > $ vagrant/vagrant-up.sh --aws > (snip) > An active machine was found with a different provider. Vagrant > currently allows each machine to be brought up with only a single > provider at a time. A future version will remove this limitation. > Until then, please destroy the existing machine to up with a new > provider. > Machine name: zk1 > Active provider: aws > Requested provider: virtualbox > {code} > It seems that the {{vagrant hostmanager}} command also requires > {{--provider=aws}} option, in addition to {{vagrant up}}. > With that option, it succeeded as follows: > {code} > $ git diff > diff --git a/vagrant/vagrant-up.sh b/vagrant/vagrant-up.sh > index 6a4ef9564..9210a5357 100755 > --- a/vagrant/vagrant-up.sh > +++ b/vagrant/vagrant-up.sh > @@ -220,7 +220,7 @@ function bring_up_aws { > # We still have to bring up zookeeper/broker nodes serially > echo "Bringing up zookeeper/broker machines serially" > vagrant up --provider=aws --no-parallel --no-provision > $zk_broker_machines $debug > -vagrant hostmanager > +vagrant hostmanager --provider=aws > vagrant provision > fi > @@ -231,11 +231,11 @@ function bring_up_aws { > local vagrant_rsync_temp_dir=$(mktemp -d); > TMPDIR=$vagrant_rsync_temp_dir vagrant_batch_command "vagrant up > $debug --provider=aws" "$worker_machines" "$max_parallel" > rm -rf $vagrant_rsync_temp_dir > -vagrant hostmanager > +vagrant hostmanager --provider=aws > fi > else > vagrant up --provider=aws --no-parallel --no-provision $debug > -vagrant hostmanager > +vagrant hostmanager --provider=aws > vagrant provision > fi > $ vagrant/vagrant-up.sh --aws > (snip) > ==> broker3: Running provisioner: shell... > broker3: Running: /tmp/vagrant-shell20190509-25399-8f1wgz.sh > broker3: Killing server > broker3: No kafka server to stop > broker3: Starting server > $ vagrant status > Current machine states: > zk1 running (aws) > broker1 running (aws) > broker2 running (aws) > broker3 running (aws) > This environment represents multiple VMs. The VMs are all listed > above with their current state. For more information about a specific > VM, run `vagrant status NAME`. > $ vagrant ssh broker1 > (snip) > ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh > --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --create > --partitions 1 --replication-factor 3 --topic sandbox > (snip) > ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh > --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --list > (snip) > sandbox > {code} -- This message was sent
[jira] [Created] (KAFKA-8344) Fix vagrant-up.sh to work with AWS properly
Kengo Seki created KAFKA-8344: - Summary: Fix vagrant-up.sh to work with AWS properly Key: KAFKA-8344 URL: https://issues.apache.org/jira/browse/KAFKA-8344 Project: Kafka Issue Type: Bug Reporter: Kengo Seki Assignee: Kengo Seki I tried to run {{vagrant/vagrant-up.sh --aws}} with the following Vagrantfile.local. {code} enable_dns = true enable_hostmanager = false # EC2 ec2_access_key = "" ec2_secret_key = "" ec2_keypair_name = "keypair" ec2_keypair_file = "/path/to/keypair/file" ec2_region = "ap-northeast-1" ec2_ami = "ami-0905ffddadbfd01b7" ec2_security_groups = "sg-" ec2_subnet_id = "subnet-" {code} EC2 instances were successfully created, but it failed with the following error after that. {code} $ vagrant/vagrant-up.sh --aws (snip) An active machine was found with a different provider. Vagrant currently allows each machine to be brought up with only a single provider at a time. A future version will remove this limitation. Until then, please destroy the existing machine to up with a new provider. Machine name: zk1 Active provider: aws Requested provider: virtualbox {code} It seems that the {{vagrant hostmanager}} command also requires {{--provider=aws}} option, in addition to {{vagrant up}}. With that option, it succeeded as follows: {code} $ git diff diff --git a/vagrant/vagrant-up.sh b/vagrant/vagrant-up.sh index 6a4ef9564..9210a5357 100755 --- a/vagrant/vagrant-up.sh +++ b/vagrant/vagrant-up.sh @@ -220,7 +220,7 @@ function bring_up_aws { # We still have to bring up zookeeper/broker nodes serially echo "Bringing up zookeeper/broker machines serially" vagrant up --provider=aws --no-parallel --no-provision $zk_broker_machines $debug -vagrant hostmanager +vagrant hostmanager --provider=aws vagrant provision fi @@ -231,11 +231,11 @@ function bring_up_aws { local vagrant_rsync_temp_dir=$(mktemp -d); TMPDIR=$vagrant_rsync_temp_dir vagrant_batch_command "vagrant up $debug --provider=aws" "$worker_machines" "$max_parallel" rm -rf $vagrant_rsync_temp_dir -vagrant hostmanager +vagrant hostmanager --provider=aws fi else vagrant up --provider=aws --no-parallel --no-provision $debug -vagrant hostmanager +vagrant hostmanager --provider=aws vagrant provision fi $ vagrant/vagrant-up.sh --aws (snip) ==> broker3: Running provisioner: shell... broker3: Running: /tmp/vagrant-shell20190509-25399-8f1wgz.sh broker3: Killing server broker3: No kafka server to stop broker3: Starting server $ vagrant status Current machine states: zk1 running (aws) broker1 running (aws) broker2 running (aws) broker3 running (aws) This environment represents multiple VMs. The VMs are all listed above with their current state. For more information about a specific VM, run `vagrant status NAME`. $ vagrant ssh broker1 (snip) ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --create --partitions 1 --replication-factor 3 --topic sandbox (snip) ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --list (snip) sandbox {code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)