[jira] [Commented] (KAFKA-8348) Document of kafkaStreams improvement

2019-05-09 Thread Lifei Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836924#comment-16836924
 ] 

Lifei Chen commented on KAFKA-8348:
---

Hi [~mjsax],  I am starting to read and contribute to kafka streams,  this is a 
small problem I found in the doc when learning the code.

Please review the code when you have time, Thanks a lot :D.

> Document of kafkaStreams improvement
> 
>
> Key: KAFKA-8348
> URL: https://issues.apache.org/jira/browse/KAFKA-8348
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Reporter: Lifei Chen
>Assignee: Lifei Chen
>Priority: Minor
>
> there is an out of date and error example in kafkaStreams.java for current 
> version.
>  * Map is not supported for initial StreamsConfig properties
>  * `int` does not support `toString`
> related code:
> {code:java}
> // kafkaStreams.java
> * 
> * A simple example might look like this:
> * {@code
> * Properties props = new Properties();
> * props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application");
> * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> *
> * StreamsBuilder builder = new StreamsBuilder();
> * builder.stream("my-input-topic").mapValues(value -> 
> String.valueOf(value.length())).to("my-output-topic");
> *
> * KafkaStreams streams = new KafkaStreams(builder.build(), props);
> * streams.start();
> * }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8348) Document of kafkaStreams improvement

2019-05-09 Thread Lifei Chen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lifei Chen updated KAFKA-8348:
--
Reviewer: Matthias J. Sax

> Document of kafkaStreams improvement
> 
>
> Key: KAFKA-8348
> URL: https://issues.apache.org/jira/browse/KAFKA-8348
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Reporter: Lifei Chen
>Assignee: Lifei Chen
>Priority: Minor
>
> there is an out of date and error example in kafkaStreams.java for current 
> version.
>  * Map is not supported for initial StreamsConfig properties
>  * `int` does not support `toString`
> related code:
> {code:java}
> // kafkaStreams.java
> * 
> * A simple example might look like this:
> * {@code
> * Properties props = new Properties();
> * props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application");
> * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> *
> * StreamsBuilder builder = new StreamsBuilder();
> * builder.stream("my-input-topic").mapValues(value -> 
> String.valueOf(value.length())).to("my-output-topic");
> *
> * KafkaStreams streams = new KafkaStreams(builder.build(), props);
> * streams.start();
> * }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8348) Document of kafkaStreams improvement

2019-05-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8348?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836916#comment-16836916
 ] 

ASF GitHub Bot commented on KAFKA-8348:
---

hustclf commented on pull request #6707: KAFKA-8348 document optimized
URL: https://github.com/apache/kafka/pull/6707
 
 
   
   Fix out of date and wrong example in document.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Document of kafkaStreams improvement
> 
>
> Key: KAFKA-8348
> URL: https://issues.apache.org/jira/browse/KAFKA-8348
> Project: Kafka
>  Issue Type: Improvement
>  Components: documentation, streams
>Reporter: Lifei Chen
>Assignee: Lifei Chen
>Priority: Minor
>
> there is an out of date and error example in kafkaStreams.java for current 
> version.
>  * Map is not supported for initial StreamsConfig properties
>  * `int` does not support `toString`
> related code:
> {code:java}
> // kafkaStreams.java
> * 
> * A simple example might look like this:
> * {@code
> * Properties props = new Properties();
> * props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
> "my-stream-processing-application");
> * props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
> * props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> * props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
> Serdes.String().getClass());
> *
> * StreamsBuilder builder = new StreamsBuilder();
> * builder.stream("my-input-topic").mapValues(value -> 
> String.valueOf(value.length())).to("my-output-topic");
> *
> * KafkaStreams streams = new KafkaStreams(builder.build(), props);
> * streams.start();
> * }{code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8348) Document of kafkaStreams improvement

2019-05-09 Thread Lifei Chen (JIRA)
Lifei Chen created KAFKA-8348:
-

 Summary: Document of kafkaStreams improvement
 Key: KAFKA-8348
 URL: https://issues.apache.org/jira/browse/KAFKA-8348
 Project: Kafka
  Issue Type: Improvement
  Components: documentation, streams
Reporter: Lifei Chen
Assignee: Lifei Chen


there is an out of date and error example in kafkaStreams.java for current 
version.
 * Map is not supported for initial StreamsConfig properties
 * `int` does not support `toString`

related code:
{code:java}
// kafkaStreams.java

* 
* A simple example might look like this:
* {@code
* Properties props = new Properties();
* props.put(StreamsConfig.APPLICATION_ID_CONFIG, 
"my-stream-processing-application");
* props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
* props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
* props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, 
Serdes.String().getClass());
*
* StreamsBuilder builder = new StreamsBuilder();
* builder.stream("my-input-topic").mapValues(value -> 
String.valueOf(value.length())).to("my-output-topic");
*
* KafkaStreams streams = new KafkaStreams(builder.build(), props);
* streams.start();
* }{code}
 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8344) Fix vagrant-up.sh to work with AWS properly

2019-05-09 Thread Gwen Shapira (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Gwen Shapira resolved KAFKA-8344.
-
   Resolution: Fixed
Fix Version/s: 2.3.0

> Fix vagrant-up.sh to work with AWS properly
> ---
>
> Key: KAFKA-8344
> URL: https://issues.apache.org/jira/browse/KAFKA-8344
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kengo Seki
>Assignee: Kengo Seki
>Priority: Major
> Fix For: 2.3.0
>
>
> I tried to run {{vagrant/vagrant-up.sh --aws}} with the following 
> Vagrantfile.local.
> {code}
> enable_dns = true
> enable_hostmanager = false
> # EC2
> ec2_access_key = ""
> ec2_secret_key = ""
> ec2_keypair_name = "keypair"
> ec2_keypair_file = "/path/to/keypair/file"
> ec2_region = "ap-northeast-1"
> ec2_ami = "ami-0905ffddadbfd01b7"
> ec2_security_groups = "sg-"
> ec2_subnet_id = "subnet-"
> {code}
> EC2 instances were successfully created, but it failed with the following 
> error after that.
> {code}
> $ vagrant/vagrant-up.sh --aws
> (snip)
> An active machine was found with a different provider. Vagrant
> currently allows each machine to be brought up with only a single
> provider at a time. A future version will remove this limitation.
> Until then, please destroy the existing machine to up with a new
> provider.
> Machine name: zk1
> Active provider: aws
> Requested provider: virtualbox
> {code}
> It seems that the {{vagrant hostmanager}} command also requires 
> {{--provider=aws}} option, in addition to {{vagrant up}}.
> With that option, it succeeded as follows:
> {code}
> $ git diff
> diff --git a/vagrant/vagrant-up.sh b/vagrant/vagrant-up.sh
> index 6a4ef9564..9210a5357 100755
> --- a/vagrant/vagrant-up.sh
> +++ b/vagrant/vagrant-up.sh
> @@ -220,7 +220,7 @@ function bring_up_aws {
>  # We still have to bring up zookeeper/broker nodes serially
>  echo "Bringing up zookeeper/broker machines serially"
>  vagrant up --provider=aws --no-parallel --no-provision 
> $zk_broker_machines $debug
> -vagrant hostmanager
> +vagrant hostmanager --provider=aws
>  vagrant provision
>  fi
> @@ -231,11 +231,11 @@ function bring_up_aws {
>  local vagrant_rsync_temp_dir=$(mktemp -d);
>  TMPDIR=$vagrant_rsync_temp_dir vagrant_batch_command "vagrant up 
> $debug --provider=aws" "$worker_machines" "$max_parallel"
>  rm -rf $vagrant_rsync_temp_dir
> -vagrant hostmanager
> +vagrant hostmanager --provider=aws
>  fi
>  else
>  vagrant up --provider=aws --no-parallel --no-provision $debug
> -vagrant hostmanager
> +vagrant hostmanager --provider=aws
>  vagrant provision
>  fi
> $ vagrant/vagrant-up.sh --aws
> (snip)
> ==> broker3: Running provisioner: shell...
> broker3: Running: /tmp/vagrant-shell20190509-25399-8f1wgz.sh
> broker3: Killing server
> broker3: No kafka server to stop
> broker3: Starting server
> $ vagrant status
> Current machine states:
> zk1   running (aws)
> broker1   running (aws)
> broker2   running (aws)
> broker3   running (aws)
> This environment represents multiple VMs. The VMs are all listed
> above with their current state. For more information about a specific
> VM, run `vagrant status NAME`.
> $ vagrant ssh broker1
> (snip)
> ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh 
> --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --create 
> --partitions 1 --replication-factor 3 --topic sandbox
> (snip)
> ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh 
> --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --list
> (snip)
> sandbox
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8344) Fix vagrant-up.sh to work with AWS properly

2019-05-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836904#comment-16836904
 ] 

ASF GitHub Bot commented on KAFKA-8344:
---

gwenshap commented on pull request #6703: KAFKA-8344. Fix vagrant-up.sh to work 
with AWS properly
URL: https://github.com/apache/kafka/pull/6703
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix vagrant-up.sh to work with AWS properly
> ---
>
> Key: KAFKA-8344
> URL: https://issues.apache.org/jira/browse/KAFKA-8344
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kengo Seki
>Assignee: Kengo Seki
>Priority: Major
> Fix For: 2.3.0
>
>
> I tried to run {{vagrant/vagrant-up.sh --aws}} with the following 
> Vagrantfile.local.
> {code}
> enable_dns = true
> enable_hostmanager = false
> # EC2
> ec2_access_key = ""
> ec2_secret_key = ""
> ec2_keypair_name = "keypair"
> ec2_keypair_file = "/path/to/keypair/file"
> ec2_region = "ap-northeast-1"
> ec2_ami = "ami-0905ffddadbfd01b7"
> ec2_security_groups = "sg-"
> ec2_subnet_id = "subnet-"
> {code}
> EC2 instances were successfully created, but it failed with the following 
> error after that.
> {code}
> $ vagrant/vagrant-up.sh --aws
> (snip)
> An active machine was found with a different provider. Vagrant
> currently allows each machine to be brought up with only a single
> provider at a time. A future version will remove this limitation.
> Until then, please destroy the existing machine to up with a new
> provider.
> Machine name: zk1
> Active provider: aws
> Requested provider: virtualbox
> {code}
> It seems that the {{vagrant hostmanager}} command also requires 
> {{--provider=aws}} option, in addition to {{vagrant up}}.
> With that option, it succeeded as follows:
> {code}
> $ git diff
> diff --git a/vagrant/vagrant-up.sh b/vagrant/vagrant-up.sh
> index 6a4ef9564..9210a5357 100755
> --- a/vagrant/vagrant-up.sh
> +++ b/vagrant/vagrant-up.sh
> @@ -220,7 +220,7 @@ function bring_up_aws {
>  # We still have to bring up zookeeper/broker nodes serially
>  echo "Bringing up zookeeper/broker machines serially"
>  vagrant up --provider=aws --no-parallel --no-provision 
> $zk_broker_machines $debug
> -vagrant hostmanager
> +vagrant hostmanager --provider=aws
>  vagrant provision
>  fi
> @@ -231,11 +231,11 @@ function bring_up_aws {
>  local vagrant_rsync_temp_dir=$(mktemp -d);
>  TMPDIR=$vagrant_rsync_temp_dir vagrant_batch_command "vagrant up 
> $debug --provider=aws" "$worker_machines" "$max_parallel"
>  rm -rf $vagrant_rsync_temp_dir
> -vagrant hostmanager
> +vagrant hostmanager --provider=aws
>  fi
>  else
>  vagrant up --provider=aws --no-parallel --no-provision $debug
> -vagrant hostmanager
> +vagrant hostmanager --provider=aws
>  vagrant provision
>  fi
> $ vagrant/vagrant-up.sh --aws
> (snip)
> ==> broker3: Running provisioner: shell...
> broker3: Running: /tmp/vagrant-shell20190509-25399-8f1wgz.sh
> broker3: Killing server
> broker3: No kafka server to stop
> broker3: Starting server
> $ vagrant status
> Current machine states:
> zk1   running (aws)
> broker1   running (aws)
> broker2   running (aws)
> broker3   running (aws)
> This environment represents multiple VMs. The VMs are all listed
> above with their current state. For more information about a specific
> VM, run `vagrant status NAME`.
> $ vagrant ssh broker1
> (snip)
> ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh 
> --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --create 
> --partitions 1 --replication-factor 3 --topic sandbox
> (snip)
> ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh 
> --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --list
> (snip)
> sandbox
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8311) Better consumer timeout exception handling

2019-05-09 Thread WooYoung (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836879#comment-16836879
 ] 

WooYoung commented on KAFKA-8311:
-

[~mjsax], [~bchen225242] Thanks~

I have checked the draft implementation code.

As you mentioned, I`m considering what more should be needed to add on top of 
'default.api.timeout.ms'

> Better consumer timeout exception handling 
> ---
>
> Key: KAFKA-8311
> URL: https://issues.apache.org/jira/browse/KAFKA-8311
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: WooYoung
>Priority: Major
>  Labels: newbie
>
> When stream application crashed due to underlying consumer commit timeout, we 
> have seen following gaps:
> 1. The current timeout exception doesn't provide meaningful tuning 
> instructions. We should augment the error message to let user change 
> `default.api.timeout.ms` in order to tolerate longer reaction time.
> 2. Currently we have 3 different types of consumers on KStream: 
> thread-consumer, global-consumer and restore-consumer. Although we don't plan 
> to explicitly handle this consumer timeout on stream level, we could wrap it 
> with more meaningful message either on consumer or stream level to let user 
> be aware which consumer is having trouble.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7206) Enable batching in FindCoordinator

2019-05-09 Thread Sagar Rao (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7206?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836852#comment-16836852
 ] 

Sagar Rao commented on KAFKA-7206:
--

Hi [~shung], [~guozhang] Is it something that I can pick up?

> Enable batching in FindCoordinator
> --
>
> Key: KAFKA-7206
> URL: https://issues.apache.org/jira/browse/KAFKA-7206
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Yishun Guan
>Assignee: Yishun Guan
>Priority: Critical
>  Labels: needs-discussion, needs-kip, newbie++
>
> To quote [~guozhang] :
> "The proposal is that, we extend FindCoordinatorRequest to have multiple 
> consumer ids: today each FindCoordinatorRequest only contains a single 
> consumer id, so in our scenario we need to send N request for N consumer 
> groups still. If we can request for coordinators in a single request, then 
> the workflow could be simplified to:
>  # send a single FindCoordinatorRequest to a broker asking for coordinators 
> of all consumer groups.
>  1.a) note that the response may still succeed in finding some coordinators 
> while error on others, and we need to handle them on that granularity (see 
> below).
>  # and then for the collected coordinator, group them by coordinator id and 
> send one request per coordinator destination.
> Note that this change would require the version to be bumped up, to 
> FIND_COORDINATOR_REQUEST_V3 for such protocol changes, also the RESPONSE 
> version should be bumped up in order to include multiple coordinators."
> A KIP is needed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8231) Expansion of ConnectClusterState interface

2019-05-09 Thread Randall Hauch (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Randall Hauch resolved KAFKA-8231.
--
Resolution: Fixed

> Expansion of ConnectClusterState interface
> --
>
> Key: KAFKA-8231
> URL: https://issues.apache.org/jira/browse/KAFKA-8231
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.3.0
>
>
> This covers [KIP-454: Expansion of the ConnectClusterState 
> interface|https://cwiki.apache.org/confluence/display/KAFKA/KIP-454%3A+Expansion+of+the+ConnectClusterState+interface]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8230) Add static membership support in librd consumer client

2019-05-09 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836820#comment-16836820
 ] 

Boyang Chen commented on KAFKA-8230:


[~ijuma] woo, miss this point!

> Add static membership support in librd consumer client 
> ---
>
> Key: KAFKA-8230
> URL: https://issues.apache.org/jira/browse/KAFKA-8230
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Priority: Minor
>  Labels: consumer
>
> Once the effort in https://issues.apache.org/jira/browse/KAFKA-7018 is done, 
> one of the low hanging fruit is to add this support for other language Kafka 
> consumers, such as c consumer in librdKafka.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8231) Expansion of ConnectClusterState interface

2019-05-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8231?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836819#comment-16836819
 ] 

ASF GitHub Bot commented on KAFKA-8231:
---

rhauch commented on pull request #6584: KAFKA-8231: Expansion of 
ConnectClusterState interface
URL: https://github.com/apache/kafka/pull/6584
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Expansion of ConnectClusterState interface
> --
>
> Key: KAFKA-8231
> URL: https://issues.apache.org/jira/browse/KAFKA-8231
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.3.0
>
>
> This covers [KIP-454: Expansion of the ConnectClusterState 
> interface|https://cwiki.apache.org/confluence/display/KAFKA/KIP-454%3A+Expansion+of+the+ConnectClusterState+interface]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8311) Better consumer timeout exception handling

2019-05-09 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836813#comment-16836813
 ] 

Boyang Chen edited comment on KAFKA-8311 at 5/10/19 1:09 AM:
-

[~mjsax] [~clearpal7] Thank you both! If you wonder the codepath, I have some 
draft implementation here: [https://github.com/apache/kafka/pull/6662]

we probably could try to merge this one first, and then discuss whether we 
should add a new timeout config on top of `default.api.timeout.ms`


was (Author: bchen225242):
[~mjsax] [~clearpal7] Thank you both! If you wonder the codepath, I have some 
draft implementation here: [https://github.com/apache/kafka/pull/6662]

> Better consumer timeout exception handling 
> ---
>
> Key: KAFKA-8311
> URL: https://issues.apache.org/jira/browse/KAFKA-8311
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: WooYoung
>Priority: Major
>  Labels: newbie
>
> When stream application crashed due to underlying consumer commit timeout, we 
> have seen following gaps:
> 1. The current timeout exception doesn't provide meaningful tuning 
> instructions. We should augment the error message to let user change 
> `default.api.timeout.ms` in order to tolerate longer reaction time.
> 2. Currently we have 3 different types of consumers on KStream: 
> thread-consumer, global-consumer and restore-consumer. Although we don't plan 
> to explicitly handle this consumer timeout on stream level, we could wrap it 
> with more meaningful message either on consumer or stream level to let user 
> be aware which consumer is having trouble.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8311) Better consumer timeout exception handling

2019-05-09 Thread Boyang Chen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836813#comment-16836813
 ] 

Boyang Chen commented on KAFKA-8311:


[~mjsax] [~clearpal7] Thank you both! If you wonder the codepath, I have some 
draft implementation here: [https://github.com/apache/kafka/pull/6662]

> Better consumer timeout exception handling 
> ---
>
> Key: KAFKA-8311
> URL: https://issues.apache.org/jira/browse/KAFKA-8311
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: WooYoung
>Priority: Major
>  Labels: newbie
>
> When stream application crashed due to underlying consumer commit timeout, we 
> have seen following gaps:
> 1. The current timeout exception doesn't provide meaningful tuning 
> instructions. We should augment the error message to let user change 
> `default.api.timeout.ms` in order to tolerate longer reaction time.
> 2. Currently we have 3 different types of consumers on KStream: 
> thread-consumer, global-consumer and restore-consumer. Although we don't plan 
> to explicitly handle this consumer timeout on stream level, we could wrap it 
> with more meaningful message either on consumer or stream level to let user 
> be aware which consumer is having trouble.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2019-05-09 Thread Jason Gustafson (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836811#comment-16836811
 ] 

Jason Gustafson commented on KAFKA-8335:


Thanks for the additional information. This is looking like a bug to me. The 
cleaner will leave around empty batches in order to preserve producer state, 
but these should eventually be cleaned as long as there are newer entries for 
the same producerIds. That doesn't appear to be working correctly though. I 
will investigate further.

> Log cleaner skips Transactional mark and batch record, causing unlimited 
> growth of __consumer_offsets
> -
>
> Key: KAFKA-8335
> URL: https://issues.apache.org/jira/browse/KAFKA-8335
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Priority: Major
> Attachments: seg_april_25.zip, segment.zip
>
>
> My Colleague Weichu already sent out a mail to kafka user mailing list 
> regarding this issue, but we think it's worth having a ticket tracking it.
> We are using Kafka Streams with exactly-once enabled on a Kafka cluster for
> a while.
> Recently we found that the size of __consumer_offsets partitions grew huge.
> Some partition went over 30G. This caused Kafka to take quite long to load
> "__consumer_offsets" topic on startup (it loads the topic in order to
> become group coordinator).
> We dumped the __consumer_offsets segments and found that while normal
> offset commits are nicely compacted, transaction records (COMMIT, etc) are
> all preserved. Looks like that since these messages don't have a key, the
> LogCleaner is keeping them all:
> --
> $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /003484332061.log --key-decoder-class
> kafka.serializer.StringDecoder 2>/dev/null | cat -v | head
> Dumping 003484332061.log
> Starting offset: 3484332061
> offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006
> producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 81
> offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005
> producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 84
> ...
> --
> Streams is doing transaction commits per 100ms (commit.interval.ms=100 when
> exactly-once) so the __consumer_offsets is growing really fast.
> Is this (to keep all transactions) by design, or is that a bug for
> LogCleaner?  What would be the way to clean up the topic?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8335) Log cleaner skips Transactional mark and batch record, causing unlimited growth of __consumer_offsets

2019-05-09 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8335?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson reassigned KAFKA-8335:
--

Assignee: Jason Gustafson

> Log cleaner skips Transactional mark and batch record, causing unlimited 
> growth of __consumer_offsets
> -
>
> Key: KAFKA-8335
> URL: https://issues.apache.org/jira/browse/KAFKA-8335
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.2.0
>Reporter: Boquan Tang
>Assignee: Jason Gustafson
>Priority: Major
> Attachments: seg_april_25.zip, segment.zip
>
>
> My Colleague Weichu already sent out a mail to kafka user mailing list 
> regarding this issue, but we think it's worth having a ticket tracking it.
> We are using Kafka Streams with exactly-once enabled on a Kafka cluster for
> a while.
> Recently we found that the size of __consumer_offsets partitions grew huge.
> Some partition went over 30G. This caused Kafka to take quite long to load
> "__consumer_offsets" topic on startup (it loads the topic in order to
> become group coordinator).
> We dumped the __consumer_offsets segments and found that while normal
> offset commits are nicely compacted, transaction records (COMMIT, etc) are
> all preserved. Looks like that since these messages don't have a key, the
> LogCleaner is keeping them all:
> --
> $ bin/kafka-run-class.sh kafka.tools.DumpLogSegments --files
> /003484332061.log --key-decoder-class
> kafka.serializer.StringDecoder 2>/dev/null | cat -v | head
> Dumping 003484332061.log
> Starting offset: 3484332061
> offset: 3484332089 position: 549 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 1006
> producerEpoch: 2530 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 81
> offset: 3484332090 position: 627 CreateTime: 1556003706952 isvalid: true
> keysize: 4 valuesize: 6 magic: 2 compresscodec: NONE producerId: 4005
> producerEpoch: 2520 sequence: -1 isTransactional: true headerKeys: []
> endTxnMarker: COMMIT coordinatorEpoch: 84
> ...
> --
> Streams is doing transaction commits per 100ms (commit.interval.ms=100 when
> exactly-once) so the __consumer_offsets is growing really fast.
> Is this (to keep all transactions) by design, or is that a bug for
> LogCleaner?  What would be the way to clean up the topic?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8313) KafkaStreams state not being updated properly after shutdown

2019-05-09 Thread Guozhang Wang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8313?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8313.
--
   Resolution: Fixed
 Assignee: Guozhang Wang
Fix Version/s: 2.3.0

> KafkaStreams state not being updated properly after shutdown
> 
>
> Key: KAFKA-8313
> URL: https://issues.apache.org/jira/browse/KAFKA-8313
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
> Environment: Single broker running on Ubuntu Linux.
>Reporter: Eric
>Assignee: Guozhang Wang
>Priority: Minor
> Fix For: 2.3.0
>
> Attachments: kafka-8313-src.tgz, log.txt
>
>
> I am running a KafkaStreams inside a DropWizard server and I am trying to 
> detect when my stream shuts down (in case a non-recoverable error occurs).  I 
> was hoping I could use KafkaStreams.setStateListener() to be notified when a 
> state change occurs.  When I query the state, KafkaStreams is stuck in the 
> REBALANCING state even though its threads are all DEAD.
>  
> You can easily reproduce this by doing the following:
>  # Create a topic (I have one with 5 partitions)
>  # Create a simple Kafka stream consuming from that topic
>  # Create a StateListener and register it on that KafkaStreams
>  # Start the Kafka stream
>  # Once everything runs, delete the topic using kafka-topics.sh
> When deleting the topic, you will see the StreamThreads' state transition 
> from RUNNING to PARTITION_REVOKED and you will be notified with the 
> KafkaStreams REBALANCING state.  That's all good and expected.  Then the 
> StreamThreads transition to PENDING_SHUTDOWN and eventually to DEAD and the 
> KafkaStreams state is stuck into the REBALANCING thread.  I was expecting to 
> see a NOT_RUNNING state eventually... am I right?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8313) KafkaStreams state not being updated properly after shutdown

2019-05-09 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836801#comment-16836801
 ] 

Guozhang Wang commented on KAFKA-8313:
--

Hello [~ladoe00] Looking at your description I feel it is likely related to 
this issue https://issues.apache.org/jira/browse/KAFKA-8062 which is resolved 
in 2.2.1 / 2.3.0.

Could you try out those versions by just build from the corresponding branch 
directly and see if it goes away?

> KafkaStreams state not being updated properly after shutdown
> 
>
> Key: KAFKA-8313
> URL: https://issues.apache.org/jira/browse/KAFKA-8313
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
> Environment: Single broker running on Ubuntu Linux.
>Reporter: Eric
>Priority: Minor
> Attachments: kafka-8313-src.tgz, log.txt
>
>
> I am running a KafkaStreams inside a DropWizard server and I am trying to 
> detect when my stream shuts down (in case a non-recoverable error occurs).  I 
> was hoping I could use KafkaStreams.setStateListener() to be notified when a 
> state change occurs.  When I query the state, KafkaStreams is stuck in the 
> REBALANCING state even though its threads are all DEAD.
>  
> You can easily reproduce this by doing the following:
>  # Create a topic (I have one with 5 partitions)
>  # Create a simple Kafka stream consuming from that topic
>  # Create a StateListener and register it on that KafkaStreams
>  # Start the Kafka stream
>  # Once everything runs, delete the topic using kafka-topics.sh
> When deleting the topic, you will see the StreamThreads' state transition 
> from RUNNING to PARTITION_REVOKED and you will be notified with the 
> KafkaStreams REBALANCING state.  That's all good and expected.  Then the 
> StreamThreads transition to PENDING_SHUTDOWN and eventually to DEAD and the 
> KafkaStreams state is stuck into the REBALANCING thread.  I was expecting to 
> see a NOT_RUNNING state eventually... am I right?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8313) KafkaStreams state not being updated properly after shutdown

2019-05-09 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8313?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836802#comment-16836802
 ] 

Guozhang Wang commented on KAFKA-8313:
--

Just a few more words on why the threads' being shutdown: during the rebalance 
which is triggered by the deletion of the topic, the leader would realize that 
the target source topic does not exist (any more), and hence it will propagate 
this error code to all the members telling them to shutdown, which is normal.

The issue here though, as you already described, is that when threads are 
shutting down gracefully, the state listener was not being notified because of 
the issue above. I think it should have been fixed by now.

I'm resolving this ticket now but in case you found it was not the case, please 
feel free to re-open.

> KafkaStreams state not being updated properly after shutdown
> 
>
> Key: KAFKA-8313
> URL: https://issues.apache.org/jira/browse/KAFKA-8313
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0
> Environment: Single broker running on Ubuntu Linux.
>Reporter: Eric
>Priority: Minor
> Attachments: kafka-8313-src.tgz, log.txt
>
>
> I am running a KafkaStreams inside a DropWizard server and I am trying to 
> detect when my stream shuts down (in case a non-recoverable error occurs).  I 
> was hoping I could use KafkaStreams.setStateListener() to be notified when a 
> state change occurs.  When I query the state, KafkaStreams is stuck in the 
> REBALANCING state even though its threads are all DEAD.
>  
> You can easily reproduce this by doing the following:
>  # Create a topic (I have one with 5 partitions)
>  # Create a simple Kafka stream consuming from that topic
>  # Create a StateListener and register it on that KafkaStreams
>  # Start the Kafka stream
>  # Once everything runs, delete the topic using kafka-topics.sh
> When deleting the topic, you will see the StreamThreads' state transition 
> from RUNNING to PARTITION_REVOKED and you will be notified with the 
> KafkaStreams REBALANCING state.  That's all good and expected.  Then the 
> StreamThreads transition to PENDING_SHUTDOWN and eventually to DEAD and the 
> KafkaStreams state is stuck into the REBALANCING thread.  I was expecting to 
> see a NOT_RUNNING state eventually... am I right?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8347) Choose next record to process by timestamp

2019-05-09 Thread Sophie Blee-Goldman (JIRA)
Sophie Blee-Goldman created KAFKA-8347:
--

 Summary: Choose next record to process by timestamp
 Key: KAFKA-8347
 URL: https://issues.apache.org/jira/browse/KAFKA-8347
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Sophie Blee-Goldman


Currently PartitionGroup will determine the next record to process by choosing 
the partition with the lowest stream time. However if a partition contains out 
of order data its stream time may be significantly larger than the timestamp of 
the next record. The next record should instead be chosen as the record with 
the lowest timestamp across all partitions, regardless of which partition it 
comes from or what its partition time is.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8346) Improve replica fetcher behavior in handling partition failures

2019-05-09 Thread Aishwarya Gune (JIRA)
Aishwarya Gune created KAFKA-8346:
-

 Summary: Improve replica fetcher behavior in handling partition 
failures
 Key: KAFKA-8346
 URL: https://issues.apache.org/jira/browse/KAFKA-8346
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Aishwarya Gune


The replica fetcher thread terminates in case one of the partitions being 
monitors fails. It leads to under-replicated partitions. The thread behavior 
can be improved by dropping that particular partition and continue handling 
rest of the partitions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-6789) Add retry logic in AdminClient requests

2019-05-09 Thread Jason Gustafson (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-6789.

   Resolution: Fixed
Fix Version/s: 2.2.2
   2.1.2
   2.0.2

> Add retry logic in AdminClient requests
> ---
>
> Key: KAFKA-6789
> URL: https://issues.apache.org/jira/browse/KAFKA-6789
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Guozhang Wang
>Assignee: Manikumar
>Priority: Major
> Fix For: 2.0.2, 2.1.2, 2.2.2
>
>
> In KafkaAdminClient, today we treat all error codes as fatal and set the 
> exception accordingly in the returned futures. But for some error codes they 
> can be retried internally, for example, COORDINATOR_LOADING_IN_PROGRESS. We 
> could consider adding the retry logic internally in the admin client so that 
> users would not need to retry themselves.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics

2019-05-09 Thread Guozhang Wang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836770#comment-16836770
 ] 

Guozhang Wang commented on KAFKA-8342:
--

[~pkleindl] thanks for sharing your use case, I think we indeed need to think 
more about what's a pleasant procedure for users to use any admin tools to 
pre-create topics --- better not to manually export the topology by themselves. 
As for schemas, they should be properly registered at runtime by Streams' 
producer clients I think.

> Admin tool to setup Kafka Stream topology (internal) topics
> ---
>
> Key: KAFKA-8342
> URL: https://issues.apache.org/jira/browse/KAFKA-8342
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> We have seen customers who need to deploy their application to production 
> environment but don't have access to create changelog and repartition topics. 
> They need to ask admin team to manually create those topics before proceeding 
> to start the actual stream job. We could add an admin tool to help them go 
> through the process quicker by providing a command that could
>  # Read through current stream topology
>  # Create corresponding topics as needed, even including output topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8231) Expansion of ConnectClusterState interface

2019-05-09 Thread Chris Egerton (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8231?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton updated KAFKA-8231:
-
Fix Version/s: 2.3.0

> Expansion of ConnectClusterState interface
> --
>
> Key: KAFKA-8231
> URL: https://issues.apache.org/jira/browse/KAFKA-8231
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
> Fix For: 2.3.0
>
>
> This covers [KIP-454: Expansion of the ConnectClusterState 
> interface|https://cwiki.apache.org/confluence/display/KAFKA/KIP-454%3A+Expansion+of+the+ConnectClusterState+interface]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8324) User constructed RocksObjects leak memory

2019-05-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836755#comment-16836755
 ] 

ASF GitHub Bot commented on KAFKA-8324:
---

bbejeck commented on pull request #6697: KAFKA-8324: Add close() method to 
RocksDBConfigSetter
URL: https://github.com/apache/kafka/pull/6697
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> User constructed RocksObjects leak memory
> -
>
> Key: KAFKA-8324
> URL: https://issues.apache.org/jira/browse/KAFKA-8324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sophie Blee-Goldman
>Priority: Major
>
> Some of the RocksDB options a user can set when extending RocksDBConfigSetter 
> take Rocks objects as parameters. Many of these – including potentially large 
> objects like Cache and Filter – inherit from AbstractNativeReference and must 
> be closed explicitly in order to free the memory of the backing C++ object. 
> However the user has no way of closing any objects they have created in 
> RocksDBConfigSetter, and we do not ever close them for them. 
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6789) Add retry logic in AdminClient requests

2019-05-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6789?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836746#comment-16836746
 ] 

ASF GitHub Bot commented on KAFKA-6789:
---

hachikuji commented on pull request #5578: KAFKA-6789: Handle 
COORDINATOR_LOAD_IN_PROGRESS, COORDINATOR_NOT_AVAILABLE retriable errors in 
AdminClient API
URL: https://github.com/apache/kafka/pull/5578
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add retry logic in AdminClient requests
> ---
>
> Key: KAFKA-6789
> URL: https://issues.apache.org/jira/browse/KAFKA-6789
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Guozhang Wang
>Assignee: Manikumar
>Priority: Major
>
> In KafkaAdminClient, today we treat all error codes as fatal and set the 
> exception accordingly in the returned futures. But for some error codes they 
> can be retried internally, for example, COORDINATOR_LOADING_IN_PROGRESS. We 
> could consider adding the retry logic internally in the admin client so that 
> users would not need to retry themselves.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836735#comment-16836735
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 10:04 PM:


Thanks [~ableegoldman] and [~vvcephei].

I have today tried to isolate the issue we are facing into a simple demo app. I 
have two streams of data x and y, x is high frequency 
(https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y 
is low frequency 
https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat.

Both topics are preloaded into kafka here 
https://github.com/the4thamigo-uk/join-example/blob/master/create_topics.sh#L3

I set the join window sizes, and the grace here 
https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh#L9.

There is logging as records are read off the streams via a transformer here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54.

There is logging on the join here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93

What I see are joins only from here 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553

Also, I notice that all the x records are read first, then all the y records 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L451 and 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1452

Am I doing something wrong here?


was (Author: the4thamigo_uk):
Thanks [~ableegoldman] and [~vvcephei].

I have today tried to isolate the issue we are facing into a simple demo app. I 
have two streams of data x and y, x is high frequency 
(https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y 
is low frequency 
https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat.

I set the join window sizes, and the grace here 
https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh#L9.

There is logging as records are read off the streams via a transformer here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54.

There is logging on the join here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93

What I see are joins only from here 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553

Also, I notice that all the x records are read first, then all the y records 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L451 and 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1452

Am I doing something wrong here?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836735#comment-16836735
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 10:03 PM:


Thanks [~ableegoldman] and [~vvcephei].

I have today tried to isolate the issue we are facing into a simple demo app. I 
have two streams of data x and y, x is high frequency 
(https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y 
is low frequency 
https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat.

I set the join window sizes, and the grace here 
https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh#L9.

There is logging as records are read off the streams via a transformer here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54.

There is logging on the join here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93

What I see are joins only from here 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553

Also, I notice that all the x records are read first, then all the y records 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L451 and 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1452

Am I doing something wrong here?


was (Author: the4thamigo_uk):
Thanks [~ableegoldman] and [~vvcephei].

I have today tried to isolate the issue we are facing into a simple demo app. I 
have two streams of data x and y, x is high frequency 
(https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y 
is low frequency 
https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat.

I set the join window sizes, and the grace here 
https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh#L9.

There is logging as records are read off the streams via a transformer here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54.

There is logging on the join here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93

What I see are joins only from here 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553

Am I doing something wrong here?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836735#comment-16836735
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 9:59 PM:
---

Thanks [~ableegoldman] and [~vvcephei].

I have today tried to isolate the issue we are facing into a simple demo app. I 
have two streams of data x and y, x is high frequency 
(https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y 
is low frequency 
https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat.

I set the join window sizes, and the grace here 
https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh#L9.

There is logging as records are read off the streams via a transformer here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54.

There is logging on the join here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93

What I see are joins only from here 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553

Am I doing something wrong here?


was (Author: the4thamigo_uk):
Thanks [~ableegoldman] and [~vvcephei].

I have today tried to isolate the issue we are facing into a simple demo app. I 
have two streams of data x and y, x is high frequency 
(https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y 
is low frequency 
https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat.

I set the join window sizes, and the grace here 
https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh.

There is logging as records are read off the streams via a transformer here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54.

There is logging on the join here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93

What I see are joins only from here 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553

Am I doing something wrong here?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836735#comment-16836735
 ] 

Andrew commented on KAFKA-8315:
---

Thanks [~ableegoldman] and [~vvcephei].

I have today tried to isolate the issue we are facing into a simple demo app. I 
have two streams of data x and y, x is high frequency 
(https://github.com/the4thamigo-uk/join-example/blob/master/data/x.dat) and y 
is low frequency 
https://github.com/the4thamigo-uk/join-example/blob/master/data/y.dat.

I set the join window sizes, and the grace here 
https://github.com/the4thamigo-uk/join-example/blob/master/launch.sh.

There is logging as records are read off the streams via a transformer here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L54.

There is logging on the join here 
https://github.com/the4thamigo-uk/join-example/blob/master/src/main/java/the4thamigouk/kafka/streams/joinexample/JoinerStreamProcessor.java#L93

What I see are joins only from here 
https://github.com/the4thamigo-uk/join-example/blob/master/example.log#L1553

Am I doing something wrong here?

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Sophie Blee-Goldman (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836606#comment-16836606
 ] 

Sophie Blee-Goldman commented on KAFKA-8315:


[~the4thamigo_uk] The code you're looking for describing the logic of choosing 
the next record to process is in 
org.apache.kafka.streams.processor.internals.PartitionGroup, which contains a 
priority queue "nonEmptyQueuesByTime" that serves up the partition with the 
lowest timestamp when polled (unless max.idle.ms has passed as checked in 
StreamTask#isProcessable)

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8332) Regression in handling of JoinGroupRequest disallows deterministic protocol selection based on order of preference

2019-05-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836596#comment-16836596
 ] 

ASF GitHub Bot commented on KAFKA-8332:
---

cmccabe commented on pull request #6626: KAFKA-8332: Preserve order of 
protocols when handling JOIN_GROUP requests
URL: https://github.com/apache/kafka/pull/6626
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Regression in handling of JoinGroupRequest disallows deterministic protocol 
> selection based on order of preference
> --
>
> Key: KAFKA-8332
> URL: https://issues.apache.org/jira/browse/KAFKA-8332
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Blocker
> Fix For: 2.3.0
>
>
> When a group of Kafka clients includes more than one embedded protocol in its 
> {{JoinGroupRequest}} along with its metadata, the group membership protocol 
> defines that the protocol which is supported by all the members of a group is 
> selected, and if more than one protocols are supported by all the members the 
> protocol is selected based on the order of preference as defined in the 
> {{JoinGroupRequest}}. 
> A recent change from type {{List}} to type {{Set}} for storing the set of 
> supported embedded protocols in the {{JoinGroupRequest}} combined with the 
> old type of handling with implicit types in the scala code, has introduced 
> non-determinism in the selection of the embedded protocol by the 
> {{GroupCoordinator}}, even though the underlying type of the Set in use is a 
> variant of LinkedHashSet (it respects order). 
> The relevant code is: 
> {code:java}
> // KafkaApis.scala
> val protocols = joinGroupRequest.data().protocols().asScala.map(protocol =>
>   (protocol.name, protocol.metadata)).toList
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2019-05-09 Thread Ryanne Dolan (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryanne Dolan reassigned KAFKA-7500:
---

Assignee: Colin P. McCabe  (was: Ryanne Dolan)
Reviewer: Colin P. McCabe

[https://github.com/apache/kafka/pull/6295]

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Reporter: Ryanne Dolan
>Assignee: Colin P. McCabe
>Priority: Minor
> Fix For: 2.3.0
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2019-05-09 Thread Ryanne Dolan (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryanne Dolan updated KAFKA-7500:

Fix Version/s: 2.3.0

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
> Fix For: 2.3.0
>
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8001) Fetch from future replica stalls when local replica becomes a leader

2019-05-09 Thread Vikas Singh (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vikas Singh reassigned KAFKA-8001:
--

Assignee: Vikas Singh  (was: Colin Hicks)

> Fetch from future replica stalls when local replica becomes a leader
> 
>
> Key: KAFKA-8001
> URL: https://issues.apache.org/jira/browse/KAFKA-8001
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0, 2.1.1
>Reporter: Anna Povzner
>Assignee: Vikas Singh
>Priority: Critical
>
> With KIP-320, fetch from follower / future replica returns 
> FENCED_LEADER_EPOCH if current leader epoch in the request is lower than the 
> leader epoch known to the leader (or local replica in case of future replica 
> fetching). In case of future replica fetching from the local replica, if 
> local replica becomes the leader of the partition, the next fetch from future 
> replica fails with FENCED_LEADER_EPOCH and fetching from future replica is 
> stopped until the next leader change. 
> Proposed solution: on local replica leader change, future replica should 
> "become a follower" again, and go through the truncation phase. Or we could 
> optimize it, and just update partition state of the future replica to reflect 
> the updated current leader epoch. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2019-05-09 Thread Ryanne Dolan (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ryanne Dolan updated KAFKA-7500:

Description: 
Implement a drop-in replacement for MirrorMaker leveraging the Connect 
framework.

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]

[https://github.com/apache/kafka/pull/6295]

  was:
Implement a drop-in replacement for MirrorMaker leveraging the Connect 
framework.

https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0


> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0]
> [https://github.com/apache/kafka/pull/6295]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-8345) Create an Administrative API for Replica Reassignment

2019-05-09 Thread Colin P. McCabe (JIRA)
Colin P. McCabe created KAFKA-8345:
--

 Summary: Create an Administrative API for Replica Reassignment
 Key: KAFKA-8345
 URL: https://issues.apache.org/jira/browse/KAFKA-8345
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin P. McCabe
Assignee: Colin P. McCabe


Create an Administrative API for Replica Reassignment, as discussed in KIP-455



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7500) MirrorMaker 2.0 (KIP-382)

2019-05-09 Thread Ryanne Dolan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7500?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836058#comment-16836058
 ] 

Ryanne Dolan edited comment on KAFKA-7500 at 5/9/19 2:41 PM:
-

[~cloudfrog], glad you are taking a look. I'm looking forward to hearing about 
your experience.

> Is this expected to work with kafka 1.1.1 clusters?

Yes, I believe it will work with 0.11.0.0 or higher, but maybe you can test it 
to verify :)

> will prefix remote topic names ... be configurable? 

Yes, you can implement your own ReplicationPolicy to define remote topics 
however you like:

 
{code:java}
replication.policy.class = my.SuffixReplicationPolicy
{code}
Also, MM2 doesn't care how existing source topics are named. If your topics are 
suffixed with their local DC (a common pattern), you can leave them as-is 
without breaking anything. By default you'd get topics like "dc1.topic1-dc1", 
so you might consider implementing a ReplicationPolicy that strips the suffix 
during replication so you get just "dc1.topic1".

Ryanne

 


was (Author: ryannedolan):
[~cloudfrog], glad you are taking a look. I'm looking forward to hearing about 
your experience.

> Is this expected to work with kafka 1.1.1 clusters?

Yes, I believe it will work with 0.11.0.0 or higher, but maybe you can test it 
to verify :)

> will prefix remote topic names ... be configurable? 

Yes, you can implement your own ReplicationPolicy to define remote topics 
however you like:

 
{code:java}
replication.policy.class = my.SuffixReplicationPolicy
{code}
Also, MM2 doesn't care how existing source topics are named. If your topics are 
prefixed with their local DC (a common pattern), you can leave them as-is 
without breaking anything. By default you'd get topics like "dc1.topic1-dc1", 
so you might consider implementing a ReplicationPolicy that strips the suffix 
during replication so you get just "dc1.topic1".

Ryanne

 

> MirrorMaker 2.0 (KIP-382)
> -
>
> Key: KAFKA-7500
> URL: https://issues.apache.org/jira/browse/KAFKA-7500
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect, mirrormaker
>Reporter: Ryanne Dolan
>Assignee: Ryanne Dolan
>Priority: Minor
>
> Implement a drop-in replacement for MirrorMaker leveraging the Connect 
> framework.
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-382%3A+MirrorMaker+2.0



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7849) Warning when adding GlobalKTable

2019-05-09 Thread Frederic Tardif (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836432#comment-16836432
 ] 

Frederic Tardif commented on KAFKA-7849:


got the same issue:
{code:java}
[2019-05-08 09:54:24.212]  WARN 
[uepg-catalog_program-snapshot-connector-7ddde56e-7dff-4a14-aa11-6c8fb22b40ce-StreamThread-2]
 [Consumer 
clientId=uepg-catalog_program-snapshot-connector-7ddde56e-7dff-4a14-aa11-6c8fb22b40ce-StreamThread-2-consumer,
 groupId=uepg-catalog_program-snapshot-connector] The following subscribed 
topics are not assigned to any members: [livecms.program.snapshot]  - 
(org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:?) {code}
 

My topology is a globalTable:
{code:java}
streamBuilder.globalTable(config.getTopicName(), 
Materialized.as(config.getStoreName()));{code}
 
{quote}I confirm my consumer lag in the rocksdb checkpoint file did indeed show 
the expected partition offset. In addition the warning disappeared the third 
time I had restarted it (maybe however caused by the store already in sync). I 
would be surprised in that case that fiddling with the number of stream thread 
configuration is required. So it looks like a self healed kind of warning. The 
message however does not sound that reassuring... Next time I'll get this I'll 
try enabling debug logs to correlate the warning.
{quote}
 

 

> Warning when adding GlobalKTable
> 
>
> Key: KAFKA-7849
> URL: https://issues.apache.org/jira/browse/KAFKA-7849
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Dmitry Minkovsky
>Priority: Minor
>  Labels: newbie
>
> Per 
> https://lists.apache.org/thread.html/59c119be8a2723c501e0653fa3ed571e8c09be40d5b5170c151528b5@%3Cusers.kafka.apache.org%3E
>  
> When I add a GlobalKTable for topic "message-write-service-user-ids-by-email" 
> to my topology, I get this warning:
>  
> [2019-01-19 12:18:14,008] WARN 
> (org.apache.kafka.clients.consumer.internals.ConsumerCoordinator:421) 
> [Consumer 
> clientId=message-write-service-55f2ca4d-0efc-4344-90d3-955f9f5a65fd-StreamThread-2-consumer,
>  groupId=message-write-service] The following subscribed topics are not 
> assigned to any members: [message-write-service-user-ids-by-email] 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7805) Use --bootstrap-server option for kafka-topics.sh in ducktape tests where applicable

2019-05-09 Thread Viktor Somogyi-Vass (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7805?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Viktor Somogyi-Vass updated KAFKA-7805:
---
Summary: Use --bootstrap-server option for kafka-topics.sh in ducktape 
tests where applicable  (was: Use --bootstrap-server option in ducktape tests 
where applicable)

> Use --bootstrap-server option for kafka-topics.sh in ducktape tests where 
> applicable
> 
>
> Key: KAFKA-7805
> URL: https://issues.apache.org/jira/browse/KAFKA-7805
> Project: Kafka
>  Issue Type: Improvement
>  Components: system tests
>Reporter: Viktor Somogyi-Vass
>Priority: Major
>
> KIP-377 introduces the {{--bootstrap-server}} option and deprecates the 
> {{--zookeeper}} option in {{kafka-topics.sh}}. I'd be nice to use the new 
> option in the ducktape tests to gain higher test coverage.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-8240) Source.equals() can fail with NPE

2019-05-09 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-8240.

   Resolution: Fixed
Fix Version/s: 2.2.2
   2.1.2
   2.3.0

[~vahid] I mark this as fix version 2.2.2 – if we roll a new RC for the bug-fix 
release, please update this ticket to 2.2.1. Thanks.

> Source.equals() can fail with NPE
> -
>
> Key: KAFKA-8240
> URL: https://issues.apache.org/jira/browse/KAFKA-8240
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0, 2.1.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: beginner, easy-fix, newbie
> Fix For: 2.3.0, 2.1.2, 2.2.2
>
>
> Reported on an PR: 
> [https://github.com/apache/kafka/pull/5284/files/1df6208f48b6b72091fea71323d94a16102ffd13#r270607795]
> InternalTopologyBuilder#Source.equals() might fail with NPE if 
> `topicPattern==null`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8240) Source.equals() can fail with NPE

2019-05-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8240?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836397#comment-16836397
 ] 

ASF GitHub Bot commented on KAFKA-8240:
---

mjsax commented on pull request #6685: KAFKA-8240: Fix NPE in Source.equals()
URL: https://github.com/apache/kafka/pull/6685
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Source.equals() can fail with NPE
> -
>
> Key: KAFKA-8240
> URL: https://issues.apache.org/jira/browse/KAFKA-8240
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.2.0, 2.1.1
>Reporter: Matthias J. Sax
>Assignee: Matthias J. Sax
>Priority: Major
>  Labels: beginner, easy-fix, newbie
>
> Reported on an PR: 
> [https://github.com/apache/kafka/pull/5284/files/1df6208f48b6b72091fea71323d94a16102ffd13#r270607795]
> InternalTopologyBuilder#Source.equals() might fail with NPE if 
> `topicPattern==null`.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8311) Better consumer timeout exception handling

2019-05-09 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836381#comment-16836381
 ] 

Matthias J. Sax commented on KAFKA-8311:


Thanks for you interest [~clearpal7]! I added you to the list of contributors 
and assigned the ticket to you. You can now also self-assign tickets.

As Boyang mentioned, you should also subscribe to the dev-mailing list: 
[https://kafka.apache.org/contact]

Additionally, please read the wiki: 
[https://cwiki.apache.org/confluence/display/KAFKA/Contributing+Code+Changes] 
(just as a starting point – there are other pager that provide useful 
information to get started).

> Better consumer timeout exception handling 
> ---
>
> Key: KAFKA-8311
> URL: https://issues.apache.org/jira/browse/KAFKA-8311
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: WooYoung
>Priority: Major
>  Labels: newbie
>
> When stream application crashed due to underlying consumer commit timeout, we 
> have seen following gaps:
> 1. The current timeout exception doesn't provide meaningful tuning 
> instructions. We should augment the error message to let user change 
> `default.api.timeout.ms` in order to tolerate longer reaction time.
> 2. Currently we have 3 different types of consumers on KStream: 
> thread-consumer, global-consumer and restore-consumer. Although we don't plan 
> to explicitly handle this consumer timeout on stream level, we could wrap it 
> with more meaningful message either on consumer or stream level to let user 
> be aware which consumer is having trouble.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8311) Better consumer timeout exception handling

2019-05-09 Thread Matthias J. Sax (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8311?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-8311:
--

Assignee: WooYoung

> Better consumer timeout exception handling 
> ---
>
> Key: KAFKA-8311
> URL: https://issues.apache.org/jira/browse/KAFKA-8311
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer, streams
>Reporter: Boyang Chen
>Assignee: WooYoung
>Priority: Major
>  Labels: newbie
>
> When stream application crashed due to underlying consumer commit timeout, we 
> have seen following gaps:
> 1. The current timeout exception doesn't provide meaningful tuning 
> instructions. We should augment the error message to let user change 
> `default.api.timeout.ms` in order to tolerate longer reaction time.
> 2. Currently we have 3 different types of consumers on KStream: 
> thread-consumer, global-consumer and restore-consumer. Although we don't plan 
> to explicitly handle this consumer timeout on stream level, we could wrap it 
> with more meaningful message either on consumer or stream level to let user 
> be aware which consumer is having trouble.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8332) Regression in handling of JoinGroupRequest disallows deterministic protocol selection based on order of preference

2019-05-09 Thread Ismael Juma (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8332?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-8332:
---
Fix Version/s: 2.3.0

> Regression in handling of JoinGroupRequest disallows deterministic protocol 
> selection based on order of preference
> --
>
> Key: KAFKA-8332
> URL: https://issues.apache.org/jira/browse/KAFKA-8332
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Konstantine Karantasis
>Assignee: Konstantine Karantasis
>Priority: Blocker
> Fix For: 2.3.0
>
>
> When a group of Kafka clients includes more than one embedded protocol in its 
> {{JoinGroupRequest}} along with its metadata, the group membership protocol 
> defines that the protocol which is supported by all the members of a group is 
> selected, and if more than one protocols are supported by all the members the 
> protocol is selected based on the order of preference as defined in the 
> {{JoinGroupRequest}}. 
> A recent change from type {{List}} to type {{Set}} for storing the set of 
> supported embedded protocols in the {{JoinGroupRequest}} combined with the 
> old type of handling with implicit types in the scala code, has introduced 
> non-determinism in the selection of the embedded protocol by the 
> {{GroupCoordinator}}, even though the underlying type of the Set in use is a 
> variant of LinkedHashSet (it respects order). 
> The relevant code is: 
> {code:java}
> // KafkaApis.scala
> val protocols = joinGroupRequest.data().protocols().asScala.map(protocol =>
>   (protocol.name, protocol.metadata)).toList
> {code}
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-1194) The kafka broker cannot delete the old log files after the configured time

2019-05-09 Thread Raoul de Haard (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-1194?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836374#comment-16836374
 ] 

Raoul de Haard commented on KAFKA-1194:
---

Any updates on when this issue is being merged? 
(see:[https://github.com/apache/kafka/pull/6329#issuecomment-490901117] )

> The kafka broker cannot delete the old log files after the configured time
> --
>
> Key: KAFKA-1194
> URL: https://issues.apache.org/jira/browse/KAFKA-1194
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 0.10.0.0, 0.11.0.0, 1.0.0
> Environment: window
>Reporter: Tao Qin
>Priority: Critical
>  Labels: features, patch, windows
> Attachments: KAFKA-1194.patch, RetentionExpiredWindows.txt, 
> Untitled.jpg, image-2018-09-12-14-25-52-632.png, 
> image-2018-11-26-10-18-59-381.png, kafka-1194-v1.patch, kafka-1194-v2.patch, 
> kafka-bombarder.7z, screenshot-1.png
>
>   Original Estimate: 72h
>  Remaining Estimate: 72h
>
> We tested it in windows environment, and set the log.retention.hours to 24 
> hours.
> # The minimum age of a log file to be eligible for deletion
> log.retention.hours=24
> After several days, the kafka broker still cannot delete the old log file. 
> And we get the following exceptions:
> [2013-12-19 01:57:38,528] ERROR Uncaught exception in scheduled task 
> 'kafka-log-retention' (kafka.utils.KafkaScheduler)
> kafka.common.KafkaStorageException: Failed to change the log file suffix from 
>  to .deleted for log segment 1516723
>  at kafka.log.LogSegment.changeFileSuffixes(LogSegment.scala:249)
>  at kafka.log.Log.kafka$log$Log$$asyncDeleteSegment(Log.scala:638)
>  at kafka.log.Log.kafka$log$Log$$deleteSegment(Log.scala:629)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at kafka.log.Log$$anonfun$deleteOldSegments$1.apply(Log.scala:418)
>  at 
> scala.collection.LinearSeqOptimized$class.foreach(LinearSeqOptimized.scala:59)
>  at scala.collection.immutable.List.foreach(List.scala:76)
>  at kafka.log.Log.deleteOldSegments(Log.scala:418)
>  at 
> kafka.log.LogManager.kafka$log$LogManager$$cleanupExpiredSegments(LogManager.scala:284)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:316)
>  at 
> kafka.log.LogManager$$anonfun$cleanupLogs$3.apply(LogManager.scala:314)
>  at 
> scala.collection.TraversableLike$WithFilter$$anonfun$foreach$1.apply(TraversableLike.scala:743)
>  at scala.collection.Iterator$class.foreach(Iterator.scala:772)
>  at 
> scala.collection.JavaConversions$JIteratorWrapper.foreach(JavaConversions.scala:573)
>  at scala.collection.IterableLike$class.foreach(IterableLike.scala:73)
>  at 
> scala.collection.JavaConversions$JListWrapper.foreach(JavaConversions.scala:615)
>  at 
> scala.collection.TraversableLike$WithFilter.foreach(TraversableLike.scala:742)
>  at kafka.log.LogManager.cleanupLogs(LogManager.scala:314)
>  at 
> kafka.log.LogManager$$anonfun$startup$1.apply$mcV$sp(LogManager.scala:143)
>  at kafka.utils.KafkaScheduler$$anon$1.run(KafkaScheduler.scala:100)
>  at 
> java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:471)
>  at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:304)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:178)
>  at 
> java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:293)
>  at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>  at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>  at java.lang.Thread.run(Thread.java:724)
> I think this error happens because kafka tries to rename the log file when it 
> is still opened.  So we should close the file first before rename.
> The index file uses a special data structure, the MappedByteBuffer. Javadoc 
> describes it as:
> A mapped byte buffer and the file mapping that it represents remain valid 
> until the buffer itself is garbage-collected.
> Fortunately, I find a forceUnmap function in kafka code, and perhaps it can 
> be used to free the MappedByteBuffer.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8343) streams application crashed due to rocksdb

2019-05-09 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8343?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836347#comment-16836347
 ] 

Matthias J. Sax commented on KAFKA-8343:


I don't think that the code is executed in a different order. However, the may 
still be a race condition...

> streams application crashed due to rocksdb
> --
>
> Key: KAFKA-8343
> URL: https://issues.apache.org/jira/browse/KAFKA-8343
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 1.0.0
> Environment: centos 7 jdk8 kafka-streams1.0
>Reporter: gaoshu
>Priority: Major
> Attachments: fullsizeoutput_6.jpeg
>
>
> my streams application always crashed in few days.  The crash log looks like 
> [https://github.com/facebook/rocksdb/issues/5234|[https://github.com/facebook/rocksdb/issues/5234].]
>   so I think it may because of RocksDBStore.java closed incorrectly in 
> multithread.  I look through the below code,  it means the db.close()  should 
> after openiterators.close(). However, db.close() may be executed before 
> iterators.close() due to instructions reorder. I hope my guess is correct.
> {code:java}
> // RocksDBStore.java
> @Override
> public synchronized void close() {
> if (!open) {
> return;
> }
> open = false;
> closeOpenIterators();
> options.close();
> wOptions.close();
> fOptions.close();
> db.close();
> options = null;
> wOptions = null;
> fOptions = null;
> db = null;
> }
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7362) enable kafka broker to remove orphan partitions automatically

2019-05-09 Thread JIRA


[ 
https://issues.apache.org/jira/browse/KAFKA-7362?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836320#comment-16836320
 ] 

Ferlin-Sutton Léo commented on KAFKA-7362:
--

I can see the implementation is being worked on and discussed.

In the meantime is there a recommended way to manually delete an orphan 
partition ? We were thinking of :
 # Stopping the Kafka process on the concerned broker
 # `rm -rf` the directory containing the orphaned partition
 # Turning the kafka broker back on

> enable kafka broker to remove orphan partitions automatically 
> --
>
> Key: KAFKA-7362
> URL: https://issues.apache.org/jira/browse/KAFKA-7362
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, log
>Reporter: xiongqi wu
>Assignee: xiongqi wu
>Priority: Major
>
> When partition reassignment removes topic partitions from a offline broker, 
> those removed partitions become orphan partitions to the broker. When the 
> offline broker comes back online, it is not able to clean up both data and 
> folders that belong to orphan partitions.  Log manager will scan all all dirs 
> during startup, but the time based retention policy on a topic partition will 
> not be kicked out until the broker is either a follower or a leader of the 
> partition.  In addition, we do not have logic to delete folders that belong 
> to orphan partition today. 
> Open this ticket to provide a mechanism (when enabled) to safely remove 
> orphan partitions automatically.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836278#comment-16836278
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 10:58 AM:


[~vvcephei] Thanks again for your help and advice. 

Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

{Code}
     private static Topology joinTestStreamStream(final JoinerProperties props) 
{

    final StreamsBuilder builder = new StreamsBuilder();

    final TransformerSupplier streamLogger = () -> new 
Transformer>() {
     private ProcessorContext context;

    @Override
     public void init(ProcessorContext context)

{     this.context = context;     }

    @Override
     public KeyValue transform(Object key, 
GenericRecord value)

{     log.info(String.format("reading : topic=%s, partition=%d, 
timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), 
context.timestamp(), context.offset(), key));     return new 
KeyValue<>(key,value);     }

    @Override
     public void close()

{     }

    };

    final KStream leftStream = 
builder.stream(props.leftTopic().getName()).transform(streamLogger);
     final KStream rightStream = 
builder.stream(props.rightTopic().getName()).transform(streamLogger);

    // setup the join
     final JoinWindows joinWindow = 
JoinWindows.of(Duration.ZERO).before(Duration.parse("P2D")).grace(Duration.parse("P7D"));

    final KStream joinStream = 
leftStream.join(rightStream,
     (l, r) ->

{     log.info("joining: " + l + ", " + r);     
return null;     }

, joinWindow);

    return builder.build();
     }
{Code}
 


was (Author: the4thamigo_uk):
Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

{Code}
     private static Topology joinTestStreamStream(final JoinerProperties props) 
{

    final StreamsBuilder builder = new StreamsBuilder();

    final TransformerSupplier streamLogger = () -> new 
Transformer>() {
     private ProcessorContext context;

    @Override
     public void init(ProcessorContext context)

{     this.context = context;     }

    @Override
     public KeyValue transform(Object key, 
GenericRecord value)

{     log.info(String.format("reading : topic=%s, partition=%d, 
timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), 
context.timestamp(), context.offset(), key));     return new 
KeyValue<>(key,value);     }

    @Override
     public void close()

{     }

    };

    final KStream leftStream = 
builder.stream(props.leftTopic().getName()).transform(streamLogger);
     final KStream rightStream = 

[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836278#comment-16836278
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 10:57 AM:


Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

{Code}
     private static Topology joinTestStreamStream(final JoinerProperties props) 
{

    final StreamsBuilder builder = new StreamsBuilder();

    final TransformerSupplier streamLogger = () -> new 
Transformer>() {
     private ProcessorContext context;

    @Override
     public void init(ProcessorContext context)

{     this.context = context;     }

    @Override
     public KeyValue transform(Object key, 
GenericRecord value)

{     log.info(String.format("reading : topic=%s, partition=%d, 
timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), 
context.timestamp(), context.offset(), key));     return new 
KeyValue<>(key,value);     }

    @Override
     public void close()

{     }

    };

    final KStream leftStream = 
builder.stream(props.leftTopic().getName()).transform(streamLogger);
     final KStream rightStream = 
builder.stream(props.rightTopic().getName()).transform(streamLogger);

    // setup the join
     final JoinWindows joinWindow = 
JoinWindows.of(Duration.ZERO).before(Duration.parse("P2D")).grace(Duration.parse("P7D"));

    final KStream joinStream = 
leftStream.join(rightStream,
     (l, r) ->

{     log.info("joining: " + l + ", " + r);     
return null;     }

, joinWindow);

    return builder.build();
     }
{code:java}
 {code}
 


was (Author: the4thamigo_uk):
Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

 \{Code}
    private static Topology joinTestStreamStream(final JoinerProperties props) {

    final StreamsBuilder builder = new StreamsBuilder();

    final TransformerSupplier streamLogger = () -> new 
Transformer>() {
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
    this.context = context;
    }

    @Override
    public KeyValue transform(Object key, 
GenericRecord value) {
    log.info(String.format("reading : topic=%s, partition=%d, 
timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), 
context.timestamp(), context.offset(), key));
    return new KeyValue<>(key,value);
    }

    @Override
    public void close() {

    }
    };

    final KStream leftStream = 
builder.stream(props.leftTopic().getName()).transform(streamLogger);
    final KStream rightStream = 
builder.stream(props.rightTopic().getName()).tra

[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836278#comment-16836278
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 10:57 AM:


Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

{Code}
     private static Topology joinTestStreamStream(final JoinerProperties props) 
{

    final StreamsBuilder builder = new StreamsBuilder();

    final TransformerSupplier streamLogger = () -> new 
Transformer>() {
     private ProcessorContext context;

    @Override
     public void init(ProcessorContext context)

{     this.context = context;     }

    @Override
     public KeyValue transform(Object key, 
GenericRecord value)

{     log.info(String.format("reading : topic=%s, partition=%d, 
timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), 
context.timestamp(), context.offset(), key));     return new 
KeyValue<>(key,value);     }

    @Override
     public void close()

{     }

    };

    final KStream leftStream = 
builder.stream(props.leftTopic().getName()).transform(streamLogger);
     final KStream rightStream = 
builder.stream(props.rightTopic().getName()).transform(streamLogger);

    // setup the join
     final JoinWindows joinWindow = 
JoinWindows.of(Duration.ZERO).before(Duration.parse("P2D")).grace(Duration.parse("P7D"));

    final KStream joinStream = 
leftStream.join(rightStream,
     (l, r) ->

{     log.info("joining: " + l + ", " + r);     
return null;     }

, joinWindow);

    return builder.build();
     }
{Code}
 


was (Author: the4thamigo_uk):
Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

{Code}
     private static Topology joinTestStreamStream(final JoinerProperties props) 
{

    final StreamsBuilder builder = new StreamsBuilder();

    final TransformerSupplier streamLogger = () -> new 
Transformer>() {
     private ProcessorContext context;

    @Override
     public void init(ProcessorContext context)

{     this.context = context;     }

    @Override
     public KeyValue transform(Object key, 
GenericRecord value)

{     log.info(String.format("reading : topic=%s, partition=%d, 
timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), 
context.timestamp(), context.offset(), key));     return new 
KeyValue<>(key,value);     }

    @Override
     public void close()

{     }

    };

    final KStream leftStream = 
builder.stream(props.leftTopic().getName()).transform(streamLogger);
     final KStream rightStream = 
builder.stream(props.rightTopic().getName()).transfo

[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836278#comment-16836278
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 10:56 AM:


Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

 \{Code}
    private static Topology joinTestStreamStream(final JoinerProperties props) {

    final StreamsBuilder builder = new StreamsBuilder();

    final TransformerSupplier streamLogger = () -> new 
Transformer>() {
    private ProcessorContext context;

    @Override
    public void init(ProcessorContext context) {
    this.context = context;
    }

    @Override
    public KeyValue transform(Object key, 
GenericRecord value) {
    log.info(String.format("reading : topic=%s, partition=%d, 
timestamp=%d, offset=%d, key=%s", context.topic(), context.partition(), 
context.timestamp(), context.offset(), key));
    return new KeyValue<>(key,value);
    }

    @Override
    public void close() {

    }
    };

    final KStream leftStream = 
builder.stream(props.leftTopic().getName()).transform(streamLogger);
    final KStream rightStream = 
builder.stream(props.rightTopic().getName()).transform(streamLogger);

    // setup the join
    final JoinWindows joinWindow = 
JoinWindows.of(Duration.ZERO).before(Duration.parse("P2D")).grace(Duration.parse("P7D"));

    final KStream joinStream = 
leftStream.join(rightStream,
    (l, r) -> {
    log.info("joining: " + l + ", " + r);
    return null;
    }, joinWindow);

    return builder.build();
    }
{Code}


was (Author: the4thamigo_uk):
Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

 

 

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace 

[jira] [Comment Edited] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836278#comment-16836278
 ] 

Andrew edited comment on KAFKA-8315 at 5/9/19 10:43 AM:


Ok, so it should work as I had previously hoped then. So maybe our working 
hypothesis is incorrect? As you suggest, I am currently re-running a test using 
the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

 

 


was (Author: the4thamigo_uk):
Ok, so it should work as I had previously hoped, so maybe our working 
hypothesis is incorrect then. As you suggest, I am currently re-running a test 
using the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

 

 

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8315) Cannot pass Materialized into a join operation - hence cant set retention period independent of grace

2019-05-09 Thread Andrew (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8315?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836278#comment-16836278
 ] 

Andrew commented on KAFKA-8315:
---

Ok, so it should work as I had previously hoped, so maybe our working 
hypothesis is incorrect then. As you suggest, I am currently re-running a test 
using the following code to determine the lag between the left and right topics.

The reason we think it might be due to the right stream getting ahead is that 
this also helps to explain why we manage to perform some initial joins at the 
start of the ingestion period for about two months (while the streams are 
presumed to be in line), then nothing for most of the middle period, then a few 
days of joins at the end.

As for the retention period, I think I understand that now from all your 
explanations. The problem here is not retention, it looks like it is something 
to do with either the specifics of our dataset or the way in which the streams 
are read.

We delved into the code, and found the way that the RocksDB code works and 
think we understand it now. What I didnt manage to find is where the code is 
for the logic you describe in your first paragraph ('Streams should choose to 
...etc').

 

 

> Cannot pass Materialized into a join operation - hence cant set retention 
> period independent of grace
> -
>
> Key: KAFKA-8315
> URL: https://issues.apache.org/jira/browse/KAFKA-8315
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Andrew
>Assignee: John Roesler
>Priority: Major
> Attachments: code.java
>
>
> The documentation says to use `Materialized` not `JoinWindows.until()` 
> ([https://kafka.apache.org/22/javadoc/org/apache/kafka/streams/kstream/JoinWindows.html#until-long-]),
>  but there is no where to pass a `Materialized` instance to the join 
> operation, only to the group operation is supported it seems.
>  
> Slack conversation here : 
> [https://confluentcommunity.slack.com/archives/C48AHTCUQ/p1556799561287300]
> [Additional]
> From what I understand, the retention period should be independent of the 
> grace period, so I think this is more than a documentation fix (see comments 
> below)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8342) Admin tool to setup Kafka Stream topology (internal) topics

2019-05-09 Thread Patrik Kleindl (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8342?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836232#comment-16836232
 ] 

Patrik Kleindl commented on KAFKA-8342:
---

This sounds very interesting, I hope it is ok if I add our use case and some 
thoughts:

Our current goal was to minimize deployment order dependencies so we don't have 
to mind deploying an application (mostly streams, but normal 
producers/consumers too) and have all relevant topics created before startup.

The internal topics are handled by Kafka Streams itself, but as we are moving 
to a more restricted approach we will be in the situation described above.

Our solution was to add the definition of topics for each application as a 
configuration artifact and share the dependencies via maven.

On startup (or triggered via a command-line call) this configuration is parsed 
and the topics are set up as needed.

We do this for all "named" topics including input and output topics and could 
add the internal topics if needed.

If I understand the approach above correctly this would mean that a user would 
have to export the topology (either the toString() version or something more 
beautiful) and pass it to the tool for processing.

One thing I would like to add to the use case is the interaction with the 
schema registry, as we are using Avro the creation of the schemas for the 
internal topics and a necessary pain too as I expect this to be restricted in a 
similar way as topic creation.

> Admin tool to setup Kafka Stream topology (internal) topics
> ---
>
> Key: KAFKA-8342
> URL: https://issues.apache.org/jira/browse/KAFKA-8342
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Boyang Chen
>Priority: Major
>  Labels: newbie
>
> We have seen customers who need to deploy their application to production 
> environment but don't have access to create changelog and repartition topics. 
> They need to ask admin team to manually create those topics before proceeding 
> to start the actual stream job. We could add an admin tool to help them go 
> through the process quicker by providing a command that could
>  # Read through current stream topology
>  # Create corresponding topics as needed, even including output topics.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8146) WARNING: An illegal reflective access operation has occurred

2019-05-09 Thread Adam Antal (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8146?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836193#comment-16836193
 ] 

Adam Antal commented on KAFKA-8146:
---

Hi,

This error is the same as in HADOOP-10848. This is due to JDK9+ blocking access 
to sun.* classes.

Can be resolved with a patch in ZK side by getting rid of the reflective call.

> WARNING: An illegal reflective access operation has occurred
> 
>
> Key: KAFKA-8146
> URL: https://issues.apache.org/jira/browse/KAFKA-8146
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, core
>Affects Versions: 2.1.1
> Environment: Java 11
> Kafka v2.1.1
>Reporter: Abhi
>Priority: Major
>
> Hi,
> I am running Kafka v2.1.1 and see below warnings at the startup of server and 
> clients. What is the cause of these warnings and how they can be avoided or 
> fixed?
> *Client side:*
> WARNING: Illegal reflective access by 
> org.apache.kafka.common.network.SaslChannelBuilder 
> (file:/local/kafka/kafka_installation/kafka_2.12-2.1.1/libs/kafka-clients-2.1.1.jar)
>  to method sun.security.krb5.Config.getInstance()
> WARNING: Please consider reporting this to the maintainers of 
> org.apache.kafka.common.network.SaslChannelBuilder
> WARNING: Use --illegal-access=warn to enable warnings of further illegal 
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release
> *Server side:*
> WARNING: An illegal reflective access operation has occurred
> WARNING: Illegal reflective access by 
> org.apache.zookeeper.server.util.KerberosUtil 
> (file:/local/kafka/kafka_installation/kafka_2.12-2.1.1/libs/zookeeper-3.4.13.jar)
>  to method sun.security.krb5.Config.getInstance()
> WARNING: Please consider reporting this to the maintainers of 
> org.apache.zookeeper.server.util.KerberosUtil
> WARNING: Use --illegal-access=warn to enable warnings of further illegal 
> reflective access operations
> WARNING: All illegal access operations will be denied in a future release



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8344) Fix vagrant-up.sh to work with AWS properly

2019-05-09 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8344?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16836176#comment-16836176
 ] 

ASF GitHub Bot commented on KAFKA-8344:
---

sekikn commented on pull request #6703: KAFKA-8344. Fix vagrant-up.sh to work 
with AWS properly
URL: https://github.com/apache/kafka/pull/6703
 
 
   For now, `vagrant/vagrant-up.sh --aws` fails because
   the `vagrant hostmanager` command in that script lacks
   the `--aws` option. This PR adds it.
   
   I ran `vagrant/vagrant-up.sh --aws` with and without
   `--no-parallel` option and confirmed both worked
   as expected.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Fix vagrant-up.sh to work with AWS properly
> ---
>
> Key: KAFKA-8344
> URL: https://issues.apache.org/jira/browse/KAFKA-8344
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kengo Seki
>Assignee: Kengo Seki
>Priority: Major
>
> I tried to run {{vagrant/vagrant-up.sh --aws}} with the following 
> Vagrantfile.local.
> {code}
> enable_dns = true
> enable_hostmanager = false
> # EC2
> ec2_access_key = ""
> ec2_secret_key = ""
> ec2_keypair_name = "keypair"
> ec2_keypair_file = "/path/to/keypair/file"
> ec2_region = "ap-northeast-1"
> ec2_ami = "ami-0905ffddadbfd01b7"
> ec2_security_groups = "sg-"
> ec2_subnet_id = "subnet-"
> {code}
> EC2 instances were successfully created, but it failed with the following 
> error after that.
> {code}
> $ vagrant/vagrant-up.sh --aws
> (snip)
> An active machine was found with a different provider. Vagrant
> currently allows each machine to be brought up with only a single
> provider at a time. A future version will remove this limitation.
> Until then, please destroy the existing machine to up with a new
> provider.
> Machine name: zk1
> Active provider: aws
> Requested provider: virtualbox
> {code}
> It seems that the {{vagrant hostmanager}} command also requires 
> {{--provider=aws}} option, in addition to {{vagrant up}}.
> With that option, it succeeded as follows:
> {code}
> $ git diff
> diff --git a/vagrant/vagrant-up.sh b/vagrant/vagrant-up.sh
> index 6a4ef9564..9210a5357 100755
> --- a/vagrant/vagrant-up.sh
> +++ b/vagrant/vagrant-up.sh
> @@ -220,7 +220,7 @@ function bring_up_aws {
>  # We still have to bring up zookeeper/broker nodes serially
>  echo "Bringing up zookeeper/broker machines serially"
>  vagrant up --provider=aws --no-parallel --no-provision 
> $zk_broker_machines $debug
> -vagrant hostmanager
> +vagrant hostmanager --provider=aws
>  vagrant provision
>  fi
> @@ -231,11 +231,11 @@ function bring_up_aws {
>  local vagrant_rsync_temp_dir=$(mktemp -d);
>  TMPDIR=$vagrant_rsync_temp_dir vagrant_batch_command "vagrant up 
> $debug --provider=aws" "$worker_machines" "$max_parallel"
>  rm -rf $vagrant_rsync_temp_dir
> -vagrant hostmanager
> +vagrant hostmanager --provider=aws
>  fi
>  else
>  vagrant up --provider=aws --no-parallel --no-provision $debug
> -vagrant hostmanager
> +vagrant hostmanager --provider=aws
>  vagrant provision
>  fi
> $ vagrant/vagrant-up.sh --aws
> (snip)
> ==> broker3: Running provisioner: shell...
> broker3: Running: /tmp/vagrant-shell20190509-25399-8f1wgz.sh
> broker3: Killing server
> broker3: No kafka server to stop
> broker3: Starting server
> $ vagrant status
> Current machine states:
> zk1   running (aws)
> broker1   running (aws)
> broker2   running (aws)
> broker3   running (aws)
> This environment represents multiple VMs. The VMs are all listed
> above with their current state. For more information about a specific
> VM, run `vagrant status NAME`.
> $ vagrant ssh broker1
> (snip)
> ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh 
> --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --create 
> --partitions 1 --replication-factor 3 --topic sandbox
> (snip)
> ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh 
> --bootstrap-server broker1:9092,broker2:9092,broker3:9092 --list
> (snip)
> sandbox
> {code}



--
This message was sent 

[jira] [Created] (KAFKA-8344) Fix vagrant-up.sh to work with AWS properly

2019-05-09 Thread Kengo Seki (JIRA)
Kengo Seki created KAFKA-8344:
-

 Summary: Fix vagrant-up.sh to work with AWS properly
 Key: KAFKA-8344
 URL: https://issues.apache.org/jira/browse/KAFKA-8344
 Project: Kafka
  Issue Type: Bug
Reporter: Kengo Seki
Assignee: Kengo Seki


I tried to run {{vagrant/vagrant-up.sh --aws}} with the following 
Vagrantfile.local.

{code}
enable_dns = true
enable_hostmanager = false

# EC2
ec2_access_key = ""
ec2_secret_key = ""
ec2_keypair_name = "keypair"
ec2_keypair_file = "/path/to/keypair/file"
ec2_region = "ap-northeast-1"
ec2_ami = "ami-0905ffddadbfd01b7"
ec2_security_groups = "sg-"
ec2_subnet_id = "subnet-"
{code}

EC2 instances were successfully created, but it failed with the following error 
after that.

{code}
$ vagrant/vagrant-up.sh --aws

(snip)

An active machine was found with a different provider. Vagrant
currently allows each machine to be brought up with only a single
provider at a time. A future version will remove this limitation.
Until then, please destroy the existing machine to up with a new
provider.

Machine name: zk1
Active provider: aws
Requested provider: virtualbox
{code}

It seems that the {{vagrant hostmanager}} command also requires 
{{--provider=aws}} option, in addition to {{vagrant up}}.
With that option, it succeeded as follows:

{code}
$ git diff
diff --git a/vagrant/vagrant-up.sh b/vagrant/vagrant-up.sh
index 6a4ef9564..9210a5357 100755
--- a/vagrant/vagrant-up.sh
+++ b/vagrant/vagrant-up.sh
@@ -220,7 +220,7 @@ function bring_up_aws {
 # We still have to bring up zookeeper/broker nodes serially
 echo "Bringing up zookeeper/broker machines serially"
 vagrant up --provider=aws --no-parallel --no-provision 
$zk_broker_machines $debug
-vagrant hostmanager
+vagrant hostmanager --provider=aws
 vagrant provision
 fi

@@ -231,11 +231,11 @@ function bring_up_aws {
 local vagrant_rsync_temp_dir=$(mktemp -d);
 TMPDIR=$vagrant_rsync_temp_dir vagrant_batch_command "vagrant up 
$debug --provider=aws" "$worker_machines" "$max_parallel"
 rm -rf $vagrant_rsync_temp_dir
-vagrant hostmanager
+vagrant hostmanager --provider=aws
 fi
 else
 vagrant up --provider=aws --no-parallel --no-provision $debug
-vagrant hostmanager
+vagrant hostmanager --provider=aws
 vagrant provision
 fi

$ vagrant/vagrant-up.sh --aws

(snip)

==> broker3: Running provisioner: shell...
broker3: Running: /tmp/vagrant-shell20190509-25399-8f1wgz.sh
broker3: Killing server
broker3: No kafka server to stop
broker3: Starting server
$ vagrant status
Current machine states:

zk1   running (aws)
broker1   running (aws)
broker2   running (aws)
broker3   running (aws)

This environment represents multiple VMs. The VMs are all listed
above with their current state. For more information about a specific
VM, run `vagrant status NAME`.
$ vagrant ssh broker1

(snip)

ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh --bootstrap-server 
broker1:9092,broker2:9092,broker3:9092 --create --partitions 1 
--replication-factor 3 --topic sandbox

(snip)

ubuntu@ip-172-16-0-62:~$ /opt/kafka-dev/bin/kafka-topics.sh --bootstrap-server 
broker1:9092,broker2:9092,broker3:9092 --list

(snip)

sandbox
{code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)