[jira] [Commented] (KAFKA-7175) Make version checking logic more flexible in streams_upgrade_test.py

2018-07-20 Thread Ted Yu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7175?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551526#comment-16551526
 ] 

Ted Yu commented on KAFKA-7175:
---

Haven't found a way for python to directly reference constant defined in Java 
class.

Maybe first step is to define base version, e.g. version 3 in the following:
{code}
leader_monitor.wait_until("Received a future (version 
probing) subscription (version: 4). Sending empty assignment back (with 
supported version 3).",
{code}
Then the other versions can be expressed as base version plus some fixed 
increment(s).

This way, when there is a version bump in Java, we only need to change the base 
version in python.

> Make version checking logic more flexible in streams_upgrade_test.py
> 
>
> Key: KAFKA-7175
> URL: https://issues.apache.org/jira/browse/KAFKA-7175
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ted Yu
>Priority: Major
>
> During debugging of system test failure for KAFKA-5037, it was re-discovered 
> that the version numbers inside version probing related messages are hard 
> coded in streams_upgrade_test.py
> This is in-flexible.
> We should correlate latest version from Java class with the expected version 
> numbers.
> Matthias made the following suggestion:
> We should also make this more generic and test upgrades from 3 -> 4, 3 -> 5 
> and 4 -> 5. The current code does only go from latest version to future 
> version.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5682) Consumer should include partition in exceptions raised during record parsing/validation

2018-07-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5682?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551483#comment-16551483
 ] 

ASF GitHub Bot commented on KAFKA-5682:
---

stanislavkozlovski opened a new pull request #5410: KAFKA-5682: Include 
partitions in exceptions raised during consumer record 
deserialization/validation
URL: https://github.com/apache/kafka/pull/5410
 
 
   KIP: 
[KIP-334](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=87297793)
   Mailing Thread: 
[thread](http://mail-archives.apache.org/mod_mbox/kafka-dev/201807.mbox/%3CCANZZNGx2jq9Xi9nxYQ6tW%2B_orkF3enOmGN13Jqgh9k5DLJLgwA%40mail.gmail.com%3E)
   
   * Introduce new Exception - `RecordDeserializationException` - it extends 
`SerializationException` for backwards compatibility. Throw that exception 
where appropriate with attached partition/offset.
   * Introduce new Exception - `CorruptRecordException` - it extends 
`KafkaException` for backwards compatibility. Throw that exception where 
appropriate with attached partition/offset.
   * Make `InvalidRecordException` extend `ApiException`. It previously 
extended `CorruptRecordException` and thrown only for corrupt record scenarios. 
(as it's not in a public package).
   * Add some tests for `KafkaConsumer#poll()`
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Consumer should include partition in exceptions raised during record 
> parsing/validation
> ---
>
> Key: KAFKA-5682
> URL: https://issues.apache.org/jira/browse/KAFKA-5682
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Jason Gustafson
>Assignee: Stanislav Kozlovski
>Priority: Minor
>  Labels: needs-kip
> Fix For: 2.1.0
>
>
> When we encounter an exception when validating a fetched record or when 
> deserializing it, we raise it to the user and keep the consumer's current 
> position at the offset of the failed record. The expectation is that the user 
> will either propagate the exception and shutdown or seek past the failed 
> record. However, in the latter case, there is no way for the user to know 
> which topic partition had the failed record. We should consider exposing an 
> exception type to expose this information which users can catch. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6946) Keep the session id for incremental fetch when fetch responses are throttled

2018-07-20 Thread Jon Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6946?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee reassigned KAFKA-6946:
--

Assignee: Jon Lee

> Keep the session id for incremental fetch when fetch responses are throttled 
> -
>
> Key: KAFKA-6946
> URL: https://issues.apache.org/jira/browse/KAFKA-6946
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.0.0
>Reporter: Jon Lee
>Assignee: Jon Lee
>Priority: Major
>
> The current patch for KAFKA-6028 (KIP-219) sends a FetchResponse with 
> INVALID_SESSION_ID if the response needs to be throttled due to quota 
> violation. If it is for incremental fetch, this will make the client reset 
> its session and send a full fetch request next time. This is not a 
> correctness issue, but it may affect performance when fetches are throttled.
> In case of incremental fetch, a throttled response should use the same 
> session id as before so that the next unthrottled response can be in the same 
> session. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6944) Add system tests testing the new throttling behavior using older clients/brokers

2018-07-20 Thread Jon Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6944?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee reassigned KAFKA-6944:
--

Assignee: Jon Lee

> Add system tests testing the new throttling behavior using older 
> clients/brokers
> 
>
> Key: KAFKA-6944
> URL: https://issues.apache.org/jira/browse/KAFKA-6944
> Project: Kafka
>  Issue Type: Test
>  Components: system tests
>Affects Versions: 2.0.0
>Reporter: Jon Lee
>Assignee: Jon Lee
>Priority: Major
>
> KAFKA-6028 (KIP-219) changes the throttling behavior on quota violation as 
> follows:
>  * the broker will send out a response with throttle time to the client 
> immediately and mute the channel
>  * upon receiving a response with a non-zero throttle time, the client will 
> also block sending further requests to the broker until the throttle time is 
> over.
> The current system tests assume that both clients and brokers are of the same 
> version. We'll need an additional set of quota tests that test throttling 
> behavior between older clients and newer brokers and between newer clients 
> and older brokers. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-6028) Improve the quota throttle communication.

2018-07-20 Thread Jon Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6028?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee reassigned KAFKA-6028:
--

Assignee: Jon Lee  (was: Jiangjie Qin)

> Improve the quota throttle communication.
> -
>
> Key: KAFKA-6028
> URL: https://issues.apache.org/jira/browse/KAFKA-6028
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Affects Versions: 1.0.0
>Reporter: Jiangjie Qin
>Assignee: Jon Lee
>Priority: Major
> Fix For: 2.0.0
>
>
> Currently if a client is throttled duet to quota violation, the broker will 
> only send back a response to the clients after the throttle time has passed. 
> In this case, the clients don't know how long the response will be throttled 
> and might hit request timeout before the response is returned. As a result 
> the clients will retry sending a request and results a even longer throttle 
> time.
> The above scenario could happen when a large clients group sending records to 
> the brokers. We saw this when a MapReduce job pushes data to the Kafka 
> cluster.
> To improve this, the broker can return the response with throttle time 
> immediately after processing the requests. After that, the broker will mute 
> the channel for this client. A correct client implementation should back off 
> for that long before sending the next request. If the client ignored the 
> throttle time and send the next request immediately, the channel will be 
> muted and the request won't be processed until the throttle time has passed.
> A KIP will follow with more details.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7126) Reduce number of rebalance period for large consumer groups after a topic is created

2018-07-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551345#comment-16551345
 ] 

ASF GitHub Bot commented on KAFKA-7126:
---

jonlee2 opened a new pull request #5408: KAFKA-7126: Reduce number of rebalance 
period for large consumer group after a topic is created
URL: https://github.com/apache/kafka/pull/5408
 
 
   This patch forces metadata update for consumers with pattern subscription at 
the beginning of rebalance. This is to prevent such consumers from detecting 
subscription changes (e.g., new topic creation) independently and triggering 
multiple unnecessary rebalances. KAFKA-7126 contains detailed scenarios and 
rationale.

   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Reduce number of rebalance period for large consumer groups after a topic is 
> created
> 
>
> Key: KAFKA-7126
> URL: https://issues.apache.org/jira/browse/KAFKA-7126
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Jon Lee
>Priority: Major
> Attachments: 1.diff
>
>
> For a group of 200 MirrorMaker consumers with patten-based topic 
> subscription, a single topic creation caused 50 rebalances for each of these 
> consumer over 5 minutes period. This causes the MM to significantly lag 
> behind during this 5 minutes period and the clusters may be considerably 
> out-of-sync during this period.
> Ideally we would like to trigger only 1 rebalance in the MM group after a 
> topic is created. And conceptually it should be doable.
>  
> Here is the explanation of this repeated consumer rebalance based on the 
> consumer rebalance logic in the latest Kafka code:
> 1) A topic of 10 partitions are created in the cluster and it matches the 
> subscription pattern of the MM consumers.
> 2) The leader of the MM consumer group detects the new topic after metadata 
> refresh. It triggers rebalance.
> 3) At time T0, the first rebalance finishes. 10 consumers are assigned 1 
> partition of this topic. The other 190 consumers are not assigned any 
> partition of this topic. At this moment, the newly created topic will appear 
> in `ConsumerCoordinator.subscriptions.subscription` for those consumers who 
> is assigned partition of this consumer or who has refreshed metadata before 
> time T0.
> 4) In the common case, half of the consumers has refreshed metadata before 
> the leader of the consumer group refreshed metadata. Thus around 100 + 10 = 
> 110 consumers has the newly created topic in 
> `ConsumerCoordinator.subscriptions.subscription`. The other 90 consumers do 
> not have this topic in `ConsumerCoordinator.subscriptions.subscription`.
> 5) For those 90 consumers, if any consumer refreshes metadata, it will add 
> this topic to `ConsumerCoordinator.subscriptions.subscription`, which causes 
> `ConsumerCoordinator.rejoinNeededOrPending()` to return true and triggers 
> another rebalance. If a few consumers refresh metadata almost at the same 
> time, they will jointly trigger one rebalance. Otherwise, they each trigger a 
> separate rebalance.
> 6) The default metadata.max.age.ms is 5 minutes. Thus in the worse case, 
> which is probably also the average case if number of consumers in the group 
> is large, the latest consumer will refresh its metadata 5 minutes after T0. 
> And the rebalance will be repeated during this 5 minutes interval.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-3244) TopicCommand doesn't allow to add partitions with specific assignment

2018-07-20 Thread Ray Chiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3244?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ray Chiang updated KAFKA-3244:
--
Component/s: admin

> TopicCommand doesn't allow to add partitions with specific assignment
> -
>
> Key: KAFKA-3244
> URL: https://issues.apache.org/jira/browse/KAFKA-3244
> Project: Kafka
>  Issue Type: Bug
>  Components: admin
>Reporter: Gwen Shapira
>Priority: Major
>
> The combination of "alter" "partitions" and "replica-assignment" is marked as 
> invalid here:
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/TopicCommand.scala#L337
> Although the alter method actually allowed using both parameters:
> https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/admin/TopicCommand.scala#L144
> I think the command line limitation can be safely removed.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-5748) Fix console producer to set timestamp and partition

2018-07-20 Thread Ray Chiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ray Chiang updated KAFKA-5748:
--
Component/s: producer 

> Fix console producer to set timestamp and partition
> ---
>
> Key: KAFKA-5748
> URL: https://issues.apache.org/jira/browse/KAFKA-5748
> Project: Kafka
>  Issue Type: Improvement
>  Components: producer 
>Reporter: Ran Ma
>Priority: Major
>
> https://github.com/apache/kafka/pull/3689



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6918) Kafka server fails to start with IBM JAVA

2018-07-20 Thread Ray Chiang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6918?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551270#comment-16551270
 ] 

Ray Chiang commented on KAFKA-6918:
---

Using Ubuntu 16.0.4 and this version of IBM Java:
  
{code:java}
java version "1.8.0_171"
Java(TM) SE Runtime Environment (build 8.0.5.17 - 
pxa6480sr5fp17-20180627_01(SR5 FP17))
IBM J9 VM (build 2.9, JRE 1.8.0 Linux amd64-64-Bit Compressed References 
20180626_390413 (JIT enabled, AOT enabled)
OpenJ9 - 5cdc604
OMR - a24bc01
IBM - 21870d6)
JCL - 20180619_01 based on Oracle jdk8u171-b11{code}
I'm seeing the above error in Kafka 1.0.2 and 1.1.0, but I'm getting a broker 
startup without that error in 1.1.1.

This matches what I see in Git, since I see this fix in both the 1.1 branch and 
the 1.1.1-rc1 tag, but not in the 1.1.0 tag:
{code:java}
commit 091f96ffb0905251481444cfc14435c67c4e3608
Author: Ismael Juma 
Date: Fri May 25 07:00:56 2018 -0700

MINOR: Use reflection for signal handler and do not enable it for IBM JDK 
(#5047){code}

I think this should be good to close.

> Kafka server fails to start with IBM JAVA
> -
>
> Key: KAFKA-6918
> URL: https://issues.apache.org/jira/browse/KAFKA-6918
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.0.0
>Reporter: Nayana Thorat
>Priority: Critical
>
> Kafka server start fails with below error:
> bin/kafka-server-start.sh -daemon config/server.properties
> ERROR:
> (kafka.server.KafkaConfig)
>  FATAL  (kafka.Kafka$)
> java.lang.IllegalArgumentException: Signal already used by VM: INT
>     at 
> com.ibm.misc.SignalDispatcher.registerSignal(SignalDispatcher.java:127)
>     at sun.misc.Signal.handle(Signal.java:184)
>     at kafka.Kafka$.registerHandler$1(Kafka.scala:67)
>     at kafka.Kafka$.registerLoggingSignalHandler(Kafka.scala:74)
>     at kafka.Kafka$.main(Kafka.scala:85)
>     at kafka.Kafka.main(Kafka.scala)
>  
> Tried with binaries and well as built  Apache Kafka(v1.0.0) from source.
> Installed  IBM SDK on Ubuntu 16.04. 
> IBM java link:
> wget 
> http://public.dhe.ibm.com/ibmdl/export/pub/systems/cloud/runtimes/java/8.0.5.10/linux/x86_64/ibm-java-sdk-8.0-5.10-x86_64-archive.bin
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6843) Document issue with Zookeeper DNS name resolutions changing

2018-07-20 Thread Ray Chiang (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6843?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ray Chiang updated KAFKA-6843:
--
Component/s: documentation

> Document issue with Zookeeper DNS name resolutions changing
> ---
>
> Key: KAFKA-6843
> URL: https://issues.apache.org/jira/browse/KAFKA-6843
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: David Glasser
>Priority: Major
>
> We run Kafka and Zookeeper in Google Kubernetes Engine. We have recently had 
> problems where our brokers had serious problems when GKE replaced our cluster 
> (cycling both Zookeeper and Kafka in parallel).  Kafka (1.0) brokers lost the 
> ability the talk to Zookeeper, and eventually failed their controlled 
> shutdown, leading to slow startup times for the new broker and outages for 
> our system.
> We eventually tracked this down to the fact that (at least in our 
> environment) the default JVM DNS caching behavior is to cache results 
> forever.  We rely on DNS to connect to Zookeeper, and the DNS resolution 
> changes when the Zookeeper pods are replaced.
> The fix is straightforward: setting the property networkaddress.cache.ttl or 
> sun.net.inetaddr.ttl to make the caching non-infinite (or use a "security 
> manager"). See 
> [https://docs.oracle.com/javase/8/docs/technotes/guides/net/properties.html] 
> for details.
> I think this gotcha should be documented. Probably at 
> [https://kafka.apache.org/11/documentation/#java] ? I'm happy to submit a PR 
> if people agree this is the right place.  (I suppose somehow fixing this in 
> code would be nice too.)
> By the way, if you search the Apache issue tracker for 
> [networkaddress.cache.ttl|https://issues.apache.org/jira/browse/JAMES-774?jql=text%20~%20%22%5C%22networkaddress.cache.ttl%5C%22%22],
>  you'll learn that this is a common issue faced by many Apache Java projects.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7126) Reduce number of rebalance period for large consumer groups after a topic is created

2018-07-20 Thread Jon Lee (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7126?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jon Lee reassigned KAFKA-7126:
--

Assignee: Jon Lee  (was: Dong Lin)

> Reduce number of rebalance period for large consumer groups after a topic is 
> created
> 
>
> Key: KAFKA-7126
> URL: https://issues.apache.org/jira/browse/KAFKA-7126
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Jon Lee
>Priority: Major
> Attachments: 1.diff
>
>
> For a group of 200 MirrorMaker consumers with patten-based topic 
> subscription, a single topic creation caused 50 rebalances for each of these 
> consumer over 5 minutes period. This causes the MM to significantly lag 
> behind during this 5 minutes period and the clusters may be considerably 
> out-of-sync during this period.
> Ideally we would like to trigger only 1 rebalance in the MM group after a 
> topic is created. And conceptually it should be doable.
>  
> Here is the explanation of this repeated consumer rebalance based on the 
> consumer rebalance logic in the latest Kafka code:
> 1) A topic of 10 partitions are created in the cluster and it matches the 
> subscription pattern of the MM consumers.
> 2) The leader of the MM consumer group detects the new topic after metadata 
> refresh. It triggers rebalance.
> 3) At time T0, the first rebalance finishes. 10 consumers are assigned 1 
> partition of this topic. The other 190 consumers are not assigned any 
> partition of this topic. At this moment, the newly created topic will appear 
> in `ConsumerCoordinator.subscriptions.subscription` for those consumers who 
> is assigned partition of this consumer or who has refreshed metadata before 
> time T0.
> 4) In the common case, half of the consumers has refreshed metadata before 
> the leader of the consumer group refreshed metadata. Thus around 100 + 10 = 
> 110 consumers has the newly created topic in 
> `ConsumerCoordinator.subscriptions.subscription`. The other 90 consumers do 
> not have this topic in `ConsumerCoordinator.subscriptions.subscription`.
> 5) For those 90 consumers, if any consumer refreshes metadata, it will add 
> this topic to `ConsumerCoordinator.subscriptions.subscription`, which causes 
> `ConsumerCoordinator.rejoinNeededOrPending()` to return true and triggers 
> another rebalance. If a few consumers refresh metadata almost at the same 
> time, they will jointly trigger one rebalance. Otherwise, they each trigger a 
> separate rebalance.
> 6) The default metadata.max.age.ms is 5 minutes. Thus in the worse case, 
> which is probably also the average case if number of consumers in the group 
> is large, the latest consumer will refresh its metadata 5 minutes after T0. 
> And the rebalance will be repeated during this 5 minutes interval.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7177) Update 2.0 documentation to reflect changed quota behaviors by KIP-219

2018-07-20 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin reassigned KAFKA-7177:
---

Assignee: Jon Lee

> Update 2.0 documentation to reflect changed quota behaviors by KIP-219 
> ---
>
> Key: KAFKA-7177
> URL: https://issues.apache.org/jira/browse/KAFKA-7177
> Project: Kafka
>  Issue Type: Task
>  Components: documentation
>Reporter: Jon Lee
>Assignee: Jon Lee
>Priority: Major
>
> KIP-219 changed the way quota violation is communicated between clients and 
> brokers. Documentation should be updated accordingly. 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7126) Reduce number of rebalance period for large consumer groups after a topic is created

2018-07-20 Thread Dong Lin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7126?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551239#comment-16551239
 ] 

Dong Lin commented on KAFKA-7126:
-

The simply patch in the attachment mostly fixes the issue and there is only one 
rebalance in a group of 100 consumers after a topic is created. The patch 
implements the 2nd term solution proposed above.

Since the patch is simple and fixes the problem mostly, it may not be necessary 
to implement the 1st solution described above. It probably will help many Kafka 
users.

> Reduce number of rebalance period for large consumer groups after a topic is 
> created
> 
>
> Key: KAFKA-7126
> URL: https://issues.apache.org/jira/browse/KAFKA-7126
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Dong Lin
>Priority: Major
> Attachments: 1.diff
>
>
> For a group of 200 MirrorMaker consumers with patten-based topic 
> subscription, a single topic creation caused 50 rebalances for each of these 
> consumer over 5 minutes period. This causes the MM to significantly lag 
> behind during this 5 minutes period and the clusters may be considerably 
> out-of-sync during this period.
> Ideally we would like to trigger only 1 rebalance in the MM group after a 
> topic is created. And conceptually it should be doable.
>  
> Here is the explanation of this repeated consumer rebalance based on the 
> consumer rebalance logic in the latest Kafka code:
> 1) A topic of 10 partitions are created in the cluster and it matches the 
> subscription pattern of the MM consumers.
> 2) The leader of the MM consumer group detects the new topic after metadata 
> refresh. It triggers rebalance.
> 3) At time T0, the first rebalance finishes. 10 consumers are assigned 1 
> partition of this topic. The other 190 consumers are not assigned any 
> partition of this topic. At this moment, the newly created topic will appear 
> in `ConsumerCoordinator.subscriptions.subscription` for those consumers who 
> is assigned partition of this consumer or who has refreshed metadata before 
> time T0.
> 4) In the common case, half of the consumers has refreshed metadata before 
> the leader of the consumer group refreshed metadata. Thus around 100 + 10 = 
> 110 consumers has the newly created topic in 
> `ConsumerCoordinator.subscriptions.subscription`. The other 90 consumers do 
> not have this topic in `ConsumerCoordinator.subscriptions.subscription`.
> 5) For those 90 consumers, if any consumer refreshes metadata, it will add 
> this topic to `ConsumerCoordinator.subscriptions.subscription`, which causes 
> `ConsumerCoordinator.rejoinNeededOrPending()` to return true and triggers 
> another rebalance. If a few consumers refresh metadata almost at the same 
> time, they will jointly trigger one rebalance. Otherwise, they each trigger a 
> separate rebalance.
> 6) The default metadata.max.age.ms is 5 minutes. Thus in the worse case, 
> which is probably also the average case if number of consumers in the group 
> is large, the latest consumer will refresh its metadata 5 minutes after T0. 
> And the rebalance will be repeated during this 5 minutes interval.
>  
>  
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7185) getMatchingAcls throws StringIndexOutOfBoundsException for empty resource name

2018-07-20 Thread Rajini Sivaram (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7185?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-7185.
---
   Resolution: Fixed
 Reviewer: Ismael Juma
Fix Version/s: 2.0.0

> getMatchingAcls throws StringIndexOutOfBoundsException for empty resource name
> --
>
> Key: KAFKA-7185
> URL: https://issues.apache.org/jira/browse/KAFKA-7185
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Blocker
> Fix For: 2.0.0
>
>
> KIP-290 introduced a way to match ACLs based on prefix. Certain resource 
> names like that for group id can be empty strings. When an empty string is 
> passed into `getMatchingAcls`, it would throw a 
> `StringIndexOutOfBoundsException` because of the following logic:
> {noformat}
> val prefixed = aclCache.range(
>  Resource(resourceType, resourceName, PatternType.PREFIXED),
>  Resource(resourceType, resourceName.substring(0, Math.min(1, 
> resourceName.length)), PatternType.PREFIXED)
>  )
>  .filterKeys(resource => resourceName.startsWith(resource.name))
>  .flatMap { case (resource, versionedAcls) => versionedAcls.acls }
>  .toSet{noformat}
> This is a regression introduced in 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7185) getMatchingAcls throws StringIndexOutOfBoundsException for empty resource name

2018-07-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551172#comment-16551172
 ] 

ASF GitHub Bot commented on KAFKA-7185:
---

rajinisivaram closed pull request #5400: KAFKA-7185: Allow empty resource name 
when matching ACLs
URL: https://github.com/apache/kafka/pull/5400
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala 
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 55352584c26..e77656d748c 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -238,7 +238,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   val prefixed = aclCache.range(
 Resource(resourceType, resourceName, PatternType.PREFIXED),
-Resource(resourceType, resourceName.substring(0, 1), 
PatternType.PREFIXED)
+Resource(resourceType, resourceName.take(1), PatternType.PREFIXED)
   )
 .filterKeys(resource => resourceName.startsWith(resource.name))
 .flatMap { case (resource, versionedAcls) => versionedAcls.acls }
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 5b65a7f2586..5461413871b 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -92,6 +92,19 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "something", 
PREFIXED))
   }
 
+  @Test
+  def testAuthorizeWithEmptyResourceName(): Unit = {
+assertFalse(simpleAclAuthorizer.authorize(session, Read, Resource(Group, 
"", LITERAL)))
+simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Group, 
WildCardResource, LITERAL))
+assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Group, 
"", LITERAL)))
+  }
+
+  // Authorizing the empty resource is not supported because we create a znode 
with the resource name.
+  @Test(expected = classOf[IllegalArgumentException])
+  def testEmptyAclThrowsException(): Unit = {
+simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Group, "", 
LITERAL))
+  }
+
   @Test
   def testTopicAcl() {
 val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> getMatchingAcls throws StringIndexOutOfBoundsException for empty resource name
> --
>
> Key: KAFKA-7185
> URL: https://issues.apache.org/jira/browse/KAFKA-7185
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Blocker
>
> KIP-290 introduced a way to match ACLs based on prefix. Certain resource 
> names like that for group id can be empty strings. When an empty string is 
> passed into `getMatchingAcls`, it would throw a 
> `StringIndexOutOfBoundsException` because of the following logic:
> {noformat}
> val prefixed = aclCache.range(
>  Resource(resourceType, resourceName, PatternType.PREFIXED),
>  Resource(resourceType, resourceName.substring(0, Math.min(1, 
> resourceName.length)), PatternType.PREFIXED)
>  )
>  .filterKeys(resource => resourceName.startsWith(resource.name))
>  .flatMap { case (resource, versionedAcls) => versionedAcls.acls }
>  .toSet{noformat}
> This is a regression introduced in 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7190) Under low traffic conditions purging repartition topics cause WARN statements about UNKNOWN_PRODUCER_ID

2018-07-20 Thread Bill Bejeck (JIRA)
Bill Bejeck created KAFKA-7190:
--

 Summary: Under low traffic conditions purging repartition topics 
cause WARN statements about  UNKNOWN_PRODUCER_ID 
 Key: KAFKA-7190
 URL: https://issues.apache.org/jira/browse/KAFKA-7190
 Project: Kafka
  Issue Type: Improvement
  Components: core, streams
Affects Versions: 1.1.1, 1.1.0
Reporter: Bill Bejeck


When a streams application has little traffic, then it is possible that 
consumer purging would delete
even the last message sent by a producer (i.e., all the messages sent by
this producer have been consumed and committed), and as a result, the broker
would delete that producer's ID. The next time when this producer tries to
send, it will get this UNKNOWN_PRODUCER_ID error code, but in this case,
this error is retriable: the producer would just get a new producer id and
retries, and then this time it will succeed. 

 

Possible fixes could be on the broker side, i.e., delaying the deletion of the 
produderIDs for a more extended period or on the streams side developing a more 
conservative approach to deleting offsets from repartition topics

 

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2018-07-20 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551089#comment-16551089
 ] 

Matthias J. Sax commented on KAFKA-6520:


As mentioned, try the `WordCountExample` that is showcased in the quickstart.

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Milind Jain
>Priority: Major
>  Labels: newbie, user-experience
>
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
> See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  
> [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a 
> related issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5037) Infinite loop if all input topics are unknown at startup

2018-07-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551071#comment-16551071
 ] 

ASF GitHub Bot commented on KAFKA-5037:
---

mjsax closed pull request #2815: WIP DO NOT MERGE -- KAFKA-5037: Infinite loop 
if all input topics are unknown at startup
URL: https://github.com/apache/kafka/pull/2815
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
index 004926fd8c9..53007e378d3 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamPartitionAssignor.java
@@ -333,11 +333,12 @@ public Subscription subscription(Set topics) {
 } else {
 numPartitionsCandidate = 
metadata.partitionCountForTopic(sourceTopicName);
 if (numPartitionsCandidate == null) {
-
repartitionTopicMetadata.get(topicName).numPartitions = NOT_AVAILABLE;
+numPartitionsCandidate = 
NOT_AVAILABLE;
 }
 }
 
-if (numPartitionsCandidate != null && 
numPartitionsCandidate > numPartitions) {
+// assign NOT_AVAILABLE to avoid infinite 
loop and to propagate this information downstream
+if (numPartitionsCandidate == 
NOT_AVAILABLE || numPartitionsCandidate > numPartitions) {
 numPartitions = numPartitionsCandidate;
 }
 }
@@ -345,15 +346,21 @@ public Subscription subscription(Set topics) {
 }
 // if we still have not find the right number of 
partitions,
 // another iteration is needed
-if (numPartitions == UNKNOWN)
+if (numPartitions == UNKNOWN) {
 numPartitionsNeeded = true;
-else
+} else {
 
repartitionTopicMetadata.get(topicName).numPartitions = numPartitions;
+}
 }
 }
 }
 } while (numPartitionsNeeded);
 
+// ensure the co-partitioning of topics within the group so that they 
have the same number of partitions,
+// and enforce the number of partitions for those repartition topics 
to be the same if they
+// are co-partitioned as well.
+ensureCopartitioning(streamThread.builder.copartitionGroups(), 
repartitionTopicMetadata, metadata);
+
 // augment the metadata with the newly computed number of partitions 
for all the
 // repartition source topics
 Map allRepartitionTopicPartitions = new 
HashMap<>();
@@ -367,11 +374,6 @@ public Subscription subscription(Set topics) {
 }
 }
 
-// ensure the co-partitioning topics within the group have the same 
number of partitions,
-// and enforce the number of partitions for those repartition topics 
to be the same if they
-// are co-partitioned as well.
-ensureCopartitioning(streamThread.builder.copartitionGroups(), 
repartitionTopicMetadata, metadata);
-
 // make sure the repartition source topics exist with the right number 
of partitions,
 // create these topics if necessary
 prepareTopic(repartitionTopicMetadata);
@@ -737,7 +739,7 @@ void validate(final Set copartitionGroup,
 final Integer partitions = 
metadata.partitionCountForTopic(topic);
 
 if (partitions == null) {
-throw new 
TopologyBuilderException(String.format("stream-thread [%s] Topic not found: 
%s", threadName, topic));
+continue;
 }
 
 if (numPartitions == UNKNOWN) {
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/CopartitionedTopicsValidatorTest.java
index 77001ce72f1..587ecc6006b 100644
--- 

[jira] [Commented] (KAFKA-7154) Apache kafka getting shut down everytime it is started

2018-07-20 Thread Ray Chiang (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7154?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551062#comment-16551062
 ] 

Ray Chiang commented on KAFKA-7154:
---

A lot could depend on how you intend to make Kafka available.  The standard way 
on CentOS to handle startup scripts is documented in this file:

{code}
/usr/share/doc/initscripts-/sysvinitfiles
{code}

Beyond that, you can use the standard practice of directing stdout/stderr to 
/dev/null for your shell.  This would look something like:

{code}
nohup bin/kafka-server-start.sh config/server.properties > /dev/null 2>&1
{code}

In the future, I suggest asking these questions in the us...@kafka.apache.org 
mailing list rather than creating a JIRA.

> Apache kafka getting shut down everytime it is started
> --
>
> Key: KAFKA-7154
> URL: https://issues.apache.org/jira/browse/KAFKA-7154
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Pushkar Kumar
>Priority: Major
> Attachments: server.txt
>
>
> Hi Team,
> We performed below steps to setup Apache Kafka on a Linux CentOS box:-
> 1.Setup jdk-8u172-linux-x64
> 2.Setup zookeeper-3.5.4-beta and its running fine.
> 3.Setup kafka_2.12-1.1.0(Binary file download from Apache Kafka platform)
> Whenever I am trying to start the Kafka Service, it shows below behavior:-
> INFO Terminating process due to signal SIGHUP (kafka.Kafka$)
>  INFO [KafkaServer id=0] shutting down (kafka.server.KafkaServer)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5039) Logging in BlockingChannel and SyncProducer connect

2018-07-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5039?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551055#comment-16551055
 ] 

ASF GitHub Bot commented on KAFKA-5039:
---

ijuma closed pull request #2820: KAFKA-5039: Logging in BlockingChannel and 
SyncProducer connect
URL: https://github.com/apache/kafka/pull/2820
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/network/BlockingChannel.scala 
b/core/src/main/scala/kafka/network/BlockingChannel.scala
index 0f10577f811..06e39dcd555 100644
--- a/core/src/main/scala/kafka/network/BlockingChannel.scala
+++ b/core/src/main/scala/kafka/network/BlockingChannel.scala
@@ -82,7 +82,10 @@ class BlockingChannel( val host: String,
  connectTimeoutMs))
 
   } catch {
-case _: Throwable => disconnect()
+case e: Throwable => {
+  debug("Error trying to connect, will disconnect.", e)
+  disconnect()
+}
   }
 }
   }
diff --git a/core/src/main/scala/kafka/producer/SyncProducer.scala 
b/core/src/main/scala/kafka/producer/SyncProducer.scala
index f02648f53e4..e82ed44993e 100644
--- a/core/src/main/scala/kafka/producer/SyncProducer.scala
+++ b/core/src/main/scala/kafka/producer/SyncProducer.scala
@@ -149,7 +149,9 @@ class SyncProducer(val config: SyncProducerConfig) extends 
Logging {
 if (!blockingChannel.isConnected && !shutdown) {
   try {
 blockingChannel.connect()
-info("Connected to " + formatAddress(config.host, config.port) + " for 
producing")
+if (blockingChannel.isConnected) {
+  info("Connected to " + formatAddress(config.host, config.port) + " 
for producing")
+}
   } catch {
 case e: Exception => {
   disconnect()


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Logging in BlockingChannel and SyncProducer connect
> ---
>
> Key: KAFKA-5039
> URL: https://issues.apache.org/jira/browse/KAFKA-5039
> Project: Kafka
>  Issue Type: Bug
>Reporter: Arun Mahadevan
>Assignee: Arun Mahadevan
>Priority: Minor
>
> When an exception is thrown in BlockingChannel::connect, the connection is 
> disconnected but the actual exception is not logged. This later manifests as 
> ClosedChannelException when trying to send. Also the SyncProducer wrongfully 
> logs "Connected to host:port for producing" even in case of exceptions.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7109) KafkaConsumer should close its incremental fetch sessions on close

2018-07-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7109?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551045#comment-16551045
 ] 

ASF GitHub Bot commented on KAFKA-7109:
---

stanislavkozlovski opened a new pull request #5407: KAFKA-7109: Close cached 
fetch sessions in the broker on consumer close
URL: https://github.com/apache/kafka/pull/5407
 
 
   Previously, the consumer's incremental fetch sessions would time out once 
the consumer was gone.
   


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> KafkaConsumer should close its incremental fetch sessions on close
> --
>
> Key: KAFKA-7109
> URL: https://issues.apache.org/jira/browse/KAFKA-7109
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Colin P. McCabe
>Assignee: Colin P. McCabe
>Priority: Minor
>
> KafkaConsumer should close its incremental fetch sessions on close.  
> Currently, the sessions are not closed, but simply time out once the consumer 
> is gone.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5037) Infinite loop if all input topics are unknown at startup

2018-07-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5037?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551030#comment-16551030
 ] 

ASF GitHub Bot commented on KAFKA-5037:
---

guozhangwang closed pull request #5399: KAFKA-5037 Follow-up: move Scala test 
to Java
URL: https://github.com/apache/kafka/pull/5399
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
index 5fc768dc82b..61bbb8b5dac 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/integration/StreamTableJoinIntegrationTest.java
@@ -16,11 +16,14 @@
  */
 package org.apache.kafka.streams.integration;
 
+import org.apache.kafka.streams.KafkaStreamsWrapper;
 import org.apache.kafka.streams.StreamsBuilder;
 import org.apache.kafka.streams.StreamsConfig;
+import org.apache.kafka.streams.integration.utils.IntegrationTestUtils;
 import org.apache.kafka.streams.kstream.KStream;
 import org.apache.kafka.streams.kstream.KTable;
 import org.apache.kafka.test.IntegrationTest;
+import org.apache.kafka.test.TestUtils;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.experimental.categories.Category;
@@ -31,6 +34,8 @@
 import java.util.Collections;
 import java.util.List;
 
+import static org.junit.Assert.assertEquals;
+
 
 /**
  * Tests all available joins of Kafka Streams DSL.
@@ -56,6 +61,31 @@ public void prepareTopology() throws InterruptedException {
 leftStream = builder.stream(INPUT_TOPIC_LEFT);
 }
 
+@Test
+public void testShouldAutoShutdownOnIncompleteMetadata() throws 
InterruptedException {
+STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-incomplete");
+
+final KStream notExistStream = 
builder.stream(INPUT_TOPIC_LEFT + "-not-existed");
+
+final KTable aggregatedTable = 
notExistStream.leftJoin(rightTable, valueJoiner)
+.groupBy((key, value) -> key)
+.reduce((value1, value2) -> value1 + value2);
+
+// Write the (continuously updating) results to the output topic.
+aggregatedTable.toStream().to(OUTPUT_TOPIC);
+
+final KafkaStreamsWrapper streams = new 
KafkaStreamsWrapper(builder.build(), STREAMS_CONFIG);
+final IntegrationTestUtils.StateListenerStub listener = new 
IntegrationTestUtils.StateListenerStub();
+streams.setStreamThreadStateListener(listener);
+streams.start();
+
+TestUtils.waitForCondition(listener::revokedToPendingShutdownSeen, 
"Did not seen thread state transited to PENDING_SHUTDOWN");
+
+streams.close();
+assertEquals(listener.runningToRevokedSeen(), true);
+assertEquals(listener.revokedToPendingShutdownSeen(), true);
+}
+
 @Test
 public void testInner() throws Exception {
 STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, appID + 
"-inner");
diff --git 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala
 
b/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala
deleted file mode 100644
index 3bf597738a9..000
--- 
a/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/StreamToTableJoinWithIncompleteMetadataIntegrationTest.scala
+++ /dev/null
@@ -1,88 +0,0 @@
-/*
- * Copyright (C) 2018 Lightbend Inc. 
- * Copyright (C) 2017-2018 Alexis Seigneurin.
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- *http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.kafka.streams.scala
-
-import java.util.Properties
-
-import org.apache.kafka.clients.consumer.ConsumerConfig
-import org.apache.kafka.clients.producer.ProducerConfig
-import org.apache.kafka.common.serialization._
-import org.apache.kafka.common.utils.MockTime
-import org.apache.kafka.streams._
-import org.apache.kafka.streams.integration.utils.{EmbeddedKafkaCluster, 
IntegrationTestUtils}

[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2018-07-20 Thread Harjit Singh (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551025#comment-16551025
 ] 

Harjit Singh commented on KAFKA-6520:
-

[~mjsax] - Is there a sample app for Streaming which I can use or do I have to 
write code ? I'm familiar with Kafka, was looking for a sample for streaming so 
that I can debug this issue.

Thanks

 

- Harjit

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Milind Jain
>Priority: Major
>  Labels: newbie, user-experience
>
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
> See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  
> [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a 
> related issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6520) When a Kafka Stream can't communicate with the server, it's Status stays RUNNING

2018-07-20 Thread Matthias J. Sax (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6520?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16551012#comment-16551012
 ] 

Matthias J. Sax commented on KAFKA-6520:


[~harjitdotsingh] Not sure what you exactly mean... To write/read to/from 
Kafka, you can use command line tools `bin/kafka-console-producer` and 
`bin/kafka-console-consumer`. There is also a quickstart example application 
for Kafka Streams. I would recommend to walk through the Kafka quickstart that 
show cases all of them: https://kafka.apache.org/documentation/#quickstart

> When a Kafka Stream can't communicate with the server, it's Status stays 
> RUNNING
> 
>
> Key: KAFKA-6520
> URL: https://issues.apache.org/jira/browse/KAFKA-6520
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Michael Kohout
>Assignee: Milind Jain
>Priority: Major
>  Labels: newbie, user-experience
>
> When you execute the following scenario the application is always in RUNNING 
> state
>   
>  1)start kafka
>  2)start app, app connects to kafka and starts processing
>  3)kill kafka(stop docker container)
>  4)the application doesn't give any indication that it's no longer 
> connected(Stream State is still RUNNING, and the uncaught exception handler 
> isn't invoked)
>   
>   
>  It would be useful if the Stream State had a DISCONNECTED status.
>   
> See 
> [this|https://groups.google.com/forum/#!topic/confluent-platform/nQh2ohgdrIQ] 
> for a discussion from the google user forum.  
> [This|https://issues.apache.org/jira/browse/KAFKA-4564] is a link to a 
> related issue.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7081) Add describe all topics API to AdminClient

2018-07-20 Thread Manikumar (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7081?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-7081.
--
Resolution: Won't Fix

Closing this KIP in favor of adding filtering support to the Metadata API  

https://lists.apache.org/thread.html/9148e268ff1d3d66129dad0a298f006474754ce8b45af1a607964fff@%3Cdev.kafka.apache.org%3E

> Add describe all topics API to AdminClient
> --
>
> Key: KAFKA-7081
> URL: https://issues.apache.org/jira/browse/KAFKA-7081
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Affects Versions: 2.0.0
>Reporter: Manikumar
>Assignee: Manikumar
>Priority: Minor
>  Labels: needs-kip
>
> Currently AdminClient supports describeTopics(Collection topicNames) 
> method for topic descriptions and listTopics() for topic name listing.
> To describe all topics, users currently use listTopics() to get all topic 
> names and pass the name list to describeTopics. 
> Since "describe all topics" is a common operation, We propose to add 
> describeTopics() method to get all topic descriptions. This will be simple to 
> use and avoids additional metadata requests.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7141) kafka-consumer-group doesn't describe existing group

2018-07-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7141?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16550903#comment-16550903
 ] 

ASF GitHub Bot commented on KAFKA-7141:
---

hachikuji closed pull request #5356: KAFKA-7141: ConsumerGroupCommand should 
describe group assignment eve…
URL: https://github.com/apache/kafka/pull/5356
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala 
b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
index 48c2cffb5d3..1d61720bfa9 100755
--- a/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
+++ b/core/src/main/scala/kafka/admin/ConsumerGroupCommand.scala
@@ -27,7 +27,6 @@ import kafka.utils._
 import org.apache.kafka.clients.{CommonClientConfigs, admin}
 import org.apache.kafka.clients.admin._
 import org.apache.kafka.clients.consumer.{ConsumerConfig, KafkaConsumer, 
OffsetAndMetadata}
-import org.apache.kafka.common.protocol.Errors
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.common.{KafkaException, Node, TopicPartition}
@@ -35,7 +34,6 @@ import org.apache.kafka.common.{KafkaException, Node, 
TopicPartition}
 import scala.collection.JavaConverters._
 import scala.collection.mutable.ListBuffer
 import scala.collection.{Seq, Set}
-import scala.concurrent.ExecutionException
 import scala.util.{Failure, Success, Try}
 
 object ConsumerGroupCommand extends Logging {
@@ -340,20 +338,19 @@ object ConsumerGroupCommand extends Logging {
   val state = consumerGroup.state
   val committedOffsets = getCommittedOffsets(groupId).asScala.toMap
   var assignedTopicPartitions = ListBuffer[TopicPartition]()
-  val rowsWithConsumer = if (committedOffsets.isEmpty) 
List[PartitionAssignmentState]() else 
consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
-.sortWith(_.assignment.topicPartitions.size > 
_.assignment.topicPartitions.size)
-.flatMap { consumerSummary =>
-  val topicPartitions = 
consumerSummary.assignment.topicPartitions.asScala
-  assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
-  val partitionOffsets = 
consumerSummary.assignment.topicPartitions.asScala
-.map { topicPartition =>
-  topicPartition -> 
committedOffsets.get(topicPartition).map(_.offset)
-}.toMap
+  val rowsWithConsumer = 
consumerGroup.members.asScala.filter(!_.assignment.topicPartitions.isEmpty).toSeq
+.sortWith(_.assignment.topicPartitions.size > 
_.assignment.topicPartitions.size).flatMap { consumerSummary =>
+val topicPartitions = 
consumerSummary.assignment.topicPartitions.asScala
+assignedTopicPartitions = assignedTopicPartitions ++ topicPartitions
+val partitionOffsets = 
consumerSummary.assignment.topicPartitions.asScala
+  .map { topicPartition =>
+topicPartition -> 
committedOffsets.get(topicPartition).map(_.offset)
+  }.toMap
 
 collectConsumerAssignment(groupId, Option(consumerGroup.coordinator), 
topicPartitions.toList,
   partitionOffsets, Some(s"${consumerSummary.consumerId}"), 
Some(s"${consumerSummary.host}"),
   Some(s"${consumerSummary.clientId}"))
-}
+  }
 
   val rowsWithoutConsumer = 
committedOffsets.filterKeys(!assignedTopicPartitions.contains(_)).flatMap {
 case (topicPartition, offset) =>
diff --git 
a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala 
b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
index cf00e93558c..51082effdbd 100644
--- a/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
+++ b/core/src/test/scala/unit/kafka/admin/ConsumerGroupCommandTest.scala
@@ -92,8 +92,9 @@ class ConsumerGroupCommandTest extends KafkaServerTestHarness 
{
   def addConsumerGroupExecutor(numConsumers: Int,
topic: String = topic,
group: String = group,
-   strategy: String = 
classOf[RangeAssignor].getName): ConsumerGroupExecutor = {
-val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, 
topic, strategy)
+   strategy: String = 
classOf[RangeAssignor].getName,
+   customPropsOpt: Option[Properties] = None): 
ConsumerGroupExecutor = {
+val executor = new ConsumerGroupExecutor(brokerList, numConsumers, group, 
topic, strategy, customPropsOpt)
 addExecutor(executor)
 executor
   }
@@ -114,9 +115,10 @@ class ConsumerGroupCommandTest extends 

[jira] [Updated] (KAFKA-7189) Add a Merge Transformer for Kafka Connect

2018-07-20 Thread Abdelhamide EL ARIB (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abdelhamide EL ARIB updated KAFKA-7189:
---
Description: 
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need for a merge transformer.

A merge transformation example (We respect the [transformation 
configuration|http://kafka.apache.org/documentation.html#connect_transforms])

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

*field.root* : Field name for the root field. 

*field.list* : The list of fields to merge under the root field. Suffix with 
*{color:#8eb021}*{color}* to keep it in top level, and prefix it with 
*{color:#8eb021}?{color}* if it's optional.

 

This is our MR for this transformer [https://github.com/apache/kafka/pull/5405]

  was:
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need too for a merge transformer.

Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

*field.root* : Field name for the root field. 

*field.list* : The list of fields to merge under the root field. Suffix with 
*{color:#8eb021}*{color}* to keep it in top level, and prefix it with 
*{color:#8eb021}?{color}* if it's optional.

 

This is our MR for this transformer [https://github.com/apache/kafka/pull/5405]


> Add a Merge Transformer for Kafka Connect
> -
>
> Key: KAFKA-7189
> URL: https://issues.apache.org/jira/browse/KAFKA-7189
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Abdelhamide EL ARIB
>Priority: Minor
>
> Like the 
> [flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
>  there is the need for a merge transformer.
> A merge transformation example (We respect the [transformation 
> configuration|http://kafka.apache.org/documentation.html#connect_transforms])
> We want to add the offset and the partition for each record, and after that 
> merge them into one field _metadata :
> {code:java}
> "transforms":"AddOffset, AddPartition, MergeFields", 
> "transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddOffset.offset.field":"offset!",
> "transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddPartition.partition.field":"partition!",
> "transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
> "transforms.MergeFields.field.list":"offset,partition",
> "transforms.MergeFields.field.root":"_metadata"
> {code}
>  
> *field.root* : Field name for the root field. 
> *field.list* : The list of fields to merge under the root field. Suffix with 
> *{color:#8eb021}*{color}* to keep it in top level, and prefix it with 
> *{color:#8eb021}?{color}* if it's optional.
>  
> This is our MR for this transformer 
> [https://github.com/apache/kafka/pull/5405]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7189) Add a Merge Transformer for Kafka Connect

2018-07-20 Thread Abdelhamide EL ARIB (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abdelhamide EL ARIB updated KAFKA-7189:
---
Description: 
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need too for a merge transformer.

Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

*field.root* : Field name for the root field. 

*field.list* : The list of fields to merge under the root field. Suffix with 
{color:#8eb021}*{color} to keep it in top level, and prefix it with 
{color:#8eb021}?{color} if it's opotional.

 

This is our MR for this transformer [https://github.com/apache/kafka/pull/5405]

  was:
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need too for a merge transformer.

Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

*field.*root : Field name for the root field. 

field.list: The list of fields to merge under the root field. Suffix with 
{color:#8eb021}*{color} to keep it in top level, and prefix it with 
{color:#8eb021}?{color} if it's opotional.

 

This is our MR for this transformer [https://github.com/apache/kafka/pull/5405]


> Add a Merge Transformer for Kafka Connect
> -
>
> Key: KAFKA-7189
> URL: https://issues.apache.org/jira/browse/KAFKA-7189
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Abdelhamide EL ARIB
>Priority: Minor
>
> Like the 
> [flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
>  there is the need too for a merge transformer.
> Example transformation : 
> We want to add the offset and the partition for each record, and after that 
> merge them into one field _metadata :
>  
> {code:java}
> "transforms":"AddOffset, AddPartition, MergeFields", 
> "transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddOffset.offset.field":"offset!",
> "transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddPartition.partition.field":"partition!",
> "transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
> "transforms.MergeFields.field.list":"offset,partition",
> "transforms.MergeFields.field.root":"_metadata"
> {code}
>  
> *field.root* : Field name for the root field. 
> *field.list* : The list of fields to merge under the root field. Suffix with 
> {color:#8eb021}*{color} to keep it in top level, and prefix it with 
> {color:#8eb021}?{color} if it's opotional.
>  
> This is our MR for this transformer 
> [https://github.com/apache/kafka/pull/5405]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7189) Add a Merge Transformer for Kafka Connect

2018-07-20 Thread Abdelhamide EL ARIB (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abdelhamide EL ARIB updated KAFKA-7189:
---
Description: 
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need too for a merge transformer.

Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

*field.root* : Field name for the root field. 

*field.list* : The list of fields to merge under the root field. Suffix with 
*{color:#8eb021}*{color}* to keep it in top level, and prefix it with 
*{color:#8eb021}?{color}* if it's optional.

 

This is our MR for this transformer [https://github.com/apache/kafka/pull/5405]

  was:
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need too for a merge transformer.

Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

*field.root* : Field name for the root field. 

*field.list* : The list of fields to merge under the root field. Suffix with 
{color:#8eb021}*{color} to keep it in top level, and prefix it with 
{color:#8eb021}?{color} if it's opotional.

 

This is our MR for this transformer [https://github.com/apache/kafka/pull/5405]


> Add a Merge Transformer for Kafka Connect
> -
>
> Key: KAFKA-7189
> URL: https://issues.apache.org/jira/browse/KAFKA-7189
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Abdelhamide EL ARIB
>Priority: Minor
>
> Like the 
> [flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
>  there is the need too for a merge transformer.
> Example transformation : 
> We want to add the offset and the partition for each record, and after that 
> merge them into one field _metadata :
>  
> {code:java}
> "transforms":"AddOffset, AddPartition, MergeFields", 
> "transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddOffset.offset.field":"offset!",
> "transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddPartition.partition.field":"partition!",
> "transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
> "transforms.MergeFields.field.list":"offset,partition",
> "transforms.MergeFields.field.root":"_metadata"
> {code}
>  
> *field.root* : Field name for the root field. 
> *field.list* : The list of fields to merge under the root field. Suffix with 
> *{color:#8eb021}*{color}* to keep it in top level, and prefix it with 
> *{color:#8eb021}?{color}* if it's optional.
>  
> This is our MR for this transformer 
> [https://github.com/apache/kafka/pull/5405]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7189) Add a Merge Transformer for Kafka Connect

2018-07-20 Thread Abdelhamide EL ARIB (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abdelhamide EL ARIB updated KAFKA-7189:
---
Description: 
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need too for a merge transformer.

Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

*field.*root : Field name for the root field. 

field.list: The list of fields to merge under the root field. Suffix with 
{color:#8eb021}*{color} to keep it in top level, and prefix it with 
{color:#8eb021}?{color} if it's opotional.

 

This is our MR for this transformer [https://github.com/apache/kafka/pull/5405]

  was:
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need too for a merge transformer.

Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

field.root : Field name for the root field. 

field.list: The list of fields to merge under the root field. Suffix with 
{color:#8eb021}*{color} to keep it in top level, and prefix it with ? if it's 
opotional.

 

 

This is our MR for this transformer [https://github.com/apache/kafka/pull/5405]


> Add a Merge Transformer for Kafka Connect
> -
>
> Key: KAFKA-7189
> URL: https://issues.apache.org/jira/browse/KAFKA-7189
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Abdelhamide EL ARIB
>Priority: Minor
>
> Like the 
> [flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
>  there is the need too for a merge transformer.
> Example transformation : 
> We want to add the offset and the partition for each record, and after that 
> merge them into one field _metadata :
>  
> {code:java}
> "transforms":"AddOffset, AddPartition, MergeFields", 
> "transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddOffset.offset.field":"offset!",
> "transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddPartition.partition.field":"partition!",
> "transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
> "transforms.MergeFields.field.list":"offset,partition",
> "transforms.MergeFields.field.root":"_metadata"
> {code}
>  
> *field.*root : Field name for the root field. 
> field.list: The list of fields to merge under the root field. Suffix with 
> {color:#8eb021}*{color} to keep it in top level, and prefix it with 
> {color:#8eb021}?{color} if it's opotional.
>  
> This is our MR for this transformer 
> [https://github.com/apache/kafka/pull/5405]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7189) Add a Merge Transformer for Kafka Connect

2018-07-20 Thread Abdelhamide EL ARIB (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abdelhamide EL ARIB updated KAFKA-7189:
---
Description: 
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need too for a merge transformer.

Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

field.root : Field name for the root field. 

field.list: The list of fields to merge under the root field. Suffix with 
{color:#8eb021}*{color} to keep it in top level, and prefix it with ? if it's 
opotional.

 

 

This is our MR for this transformer [https://github.com/apache/kafka/pull/5405]

  was:
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need too for a merge transformer.

Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

field.root : Field name for the root field. 

field.list: The list of fields to merge under the root field. Suffix with `*` 
to keep it in top level, and prefix it if it's opotional.

 

 

This is our MR for this transformer [https://github.com/apache/kafka/pull/5405]


> Add a Merge Transformer for Kafka Connect
> -
>
> Key: KAFKA-7189
> URL: https://issues.apache.org/jira/browse/KAFKA-7189
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Abdelhamide EL ARIB
>Priority: Minor
>
> Like the 
> [flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
>  there is the need too for a merge transformer.
> Example transformation : 
> We want to add the offset and the partition for each record, and after that 
> merge them into one field _metadata :
>  
> {code:java}
> "transforms":"AddOffset, AddPartition, MergeFields", 
> "transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddOffset.offset.field":"offset!",
> "transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddPartition.partition.field":"partition!",
> "transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
> "transforms.MergeFields.field.list":"offset,partition",
> "transforms.MergeFields.field.root":"_metadata"
> {code}
>  
> field.root : Field name for the root field. 
> field.list: The list of fields to merge under the root field. Suffix with 
> {color:#8eb021}*{color} to keep it in top level, and prefix it with ? if it's 
> opotional.
>  
>  
> This is our MR for this transformer 
> [https://github.com/apache/kafka/pull/5405]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7189) Add a Merge Transformer for Kafka Connect

2018-07-20 Thread Abdelhamide EL ARIB (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abdelhamide EL ARIB updated KAFKA-7189:
---
Description: 
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need too for a merge transformer.

Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

field.root : Field name for the root field. 

field.list: The list of fields to merge under the root field. Suffix with `*` 
to keep it in top level, and prefix it if it's opotional.

 

 

This is our MR for this transformer [https://github.com/apache/kafka/pull/5405]

  was:
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need too for a merge transformer.

Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

field.root : Field name for the root field. 

field.list: The list of fields to merge under the root field. Suffix with * to 
keep it in top level, and prefix it if it's opotional

 

 

This is our MR for this transformer [https://github.com/apache/kafka/pull/5405]


> Add a Merge Transformer for Kafka Connect
> -
>
> Key: KAFKA-7189
> URL: https://issues.apache.org/jira/browse/KAFKA-7189
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Abdelhamide EL ARIB
>Priority: Minor
>
> Like the 
> [flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
>  there is the need too for a merge transformer.
> Example transformation : 
> We want to add the offset and the partition for each record, and after that 
> merge them into one field _metadata :
>  
> {code:java}
> "transforms":"AddOffset, AddPartition, MergeFields", 
> "transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddOffset.offset.field":"offset!",
> "transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddPartition.partition.field":"partition!",
> "transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
> "transforms.MergeFields.field.list":"offset,partition",
> "transforms.MergeFields.field.root":"_metadata"
> {code}
>  
> field.root : Field name for the root field. 
> field.list: The list of fields to merge under the root field. Suffix with `*` 
> to keep it in top level, and prefix it if it's opotional.
>  
>  
> This is our MR for this transformer 
> [https://github.com/apache/kafka/pull/5405]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7189) Add a Merge Transformer for Kafka Connect

2018-07-20 Thread Abdelhamide EL ARIB (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abdelhamide EL ARIB updated KAFKA-7189:
---
Description: 
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need too for a merge transformer.

Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

field.root : Field name for the root field. 

field.list: The list of fields to merge under the root field. Suffix with * to 
keep it in top level, and prefix it if it's opotional

 

 

This is our MR for this transformer [https://github.com/apache/kafka/pull/5405]

  was:
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need too for a merge transformer.

Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

This is our MR for this transformer [https://github.com/apache/kafka/pull/5405]


> Add a Merge Transformer for Kafka Connect
> -
>
> Key: KAFKA-7189
> URL: https://issues.apache.org/jira/browse/KAFKA-7189
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Abdelhamide EL ARIB
>Priority: Minor
>
> Like the 
> [flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
>  there is the need too for a merge transformer.
> Example transformation : 
> We want to add the offset and the partition for each record, and after that 
> merge them into one field _metadata :
>  
> {code:java}
> "transforms":"AddOffset, AddPartition, MergeFields", 
> "transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddOffset.offset.field":"offset!",
> "transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddPartition.partition.field":"partition!",
> "transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
> "transforms.MergeFields.field.list":"offset,partition",
> "transforms.MergeFields.field.root":"_metadata"
> {code}
>  
> field.root : Field name for the root field. 
> field.list: The list of fields to merge under the root field. Suffix with * 
> to keep it in top level, and prefix it if it's opotional
>  
>  
> This is our MR for this transformer 
> [https://github.com/apache/kafka/pull/5405]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7189) Add a Merge Transformer for Kafka Connect

2018-07-20 Thread Abdelhamide EL ARIB (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abdelhamide EL ARIB updated KAFKA-7189:
---
Description: 
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need too for a merge transformer.

Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

This is our MR for this transformer [https://github.com/apache/kafka/pull/5405]

  was:
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need too for a merge transformer.

Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

This is our MR for this transformer 


> Add a Merge Transformer for Kafka Connect
> -
>
> Key: KAFKA-7189
> URL: https://issues.apache.org/jira/browse/KAFKA-7189
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Abdelhamide EL ARIB
>Priority: Minor
>
> Like the 
> [flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
>  there is the need too for a merge transformer.
> Example transformation : 
> We want to add the offset and the partition for each record, and after that 
> merge them into one field _metadata :
>  
> {code:java}
> "transforms":"AddOffset, AddPartition, MergeFields", 
> "transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddOffset.offset.field":"offset!",
> "transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddPartition.partition.field":"partition!",
> "transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
> "transforms.MergeFields.field.list":"offset,partition",
> "transforms.MergeFields.field.root":"_metadata"
> {code}
>  
> This is our MR for this transformer 
> [https://github.com/apache/kafka/pull/5405]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7187) offsetsForTimes in MockConsumer implementation

2018-07-20 Thread Andras Katona (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andras Katona reassigned KAFKA-7187:


Assignee: (was: Andras Katona)

> offsetsForTimes in MockConsumer implementation
> --
>
> Key: KAFKA-7187
> URL: https://issues.apache.org/jira/browse/KAFKA-7187
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Jing Chen
>Priority: Minor
>
> The implementation for offsetsForTimes in MockConsumer is missing, it simply 
> throws UnsupportedOperationException, can anyone help to provide the 
> implementation of the method?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7189) Add a Merge Transformer for Kafka Connect

2018-07-20 Thread Abdelhamide EL ARIB (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7189?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abdelhamide EL ARIB updated KAFKA-7189:
---
Description: 
Like the 
[flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
 there is the need too for a merge transformer.

Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 

This is our MR for this transformer 

  was:
Like the 
[flatten|[https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],]
 there is the need too for a merge transformer.



Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 


This is our MR for this transformer 


> Add a Merge Transformer for Kafka Connect
> -
>
> Key: KAFKA-7189
> URL: https://issues.apache.org/jira/browse/KAFKA-7189
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Abdelhamide EL ARIB
>Priority: Minor
>
> Like the 
> [flatten|https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],
>  there is the need too for a merge transformer.
> Example transformation : 
> We want to add the offset and the partition for each record, and after that 
> merge them into one field _metadata :
>  
> {code:java}
> "transforms":"AddOffset, AddPartition, MergeFields", 
> "transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddOffset.offset.field":"offset!",
> "transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
> "transforms.AddPartition.partition.field":"partition!",
> "transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
> "transforms.MergeFields.field.list":"offset,partition",
> "transforms.MergeFields.field.root":"_metadata"
> {code}
>  
> This is our MR for this transformer 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7189) Add a Merge Transformer for Kafka Connect

2018-07-20 Thread Abdelhamide EL ARIB (JIRA)
Abdelhamide EL ARIB created KAFKA-7189:
--

 Summary: Add a Merge Transformer for Kafka Connect
 Key: KAFKA-7189
 URL: https://issues.apache.org/jira/browse/KAFKA-7189
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Reporter: Abdelhamide EL ARIB


Like the 
[flatten|[https://docs.confluent.io/current/connect/transforms/flatten.html#flatten],]
 there is the need too for a merge transformer.



Example transformation : 

We want to add the offset and the partition for each record, and after that 
merge them into one field _metadata :

 
{code:java}
"transforms":"AddOffset, AddPartition, MergeFields", 

"transforms.AddOffset.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddOffset.offset.field":"offset!",

"transforms.AddPartition.type":"org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.AddPartition.partition.field":"partition!",

"transforms.MergeFields.type":"org.apache.kafka.connect.transforms.Merge$Value",
"transforms.MergeFields.field.list":"offset,partition",
"transforms.MergeFields.field.root":"_metadata"
{code}
 


This is our MR for this transformer 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7134) KafkaLog4jAppender - Appender exceptions are propagated to caller

2018-07-20 Thread Andras Katona (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7134?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andras Katona reassigned KAFKA-7134:


Assignee: Andras Katona

> KafkaLog4jAppender - Appender exceptions are propagated to caller
> -
>
> Key: KAFKA-7134
> URL: https://issues.apache.org/jira/browse/KAFKA-7134
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: venkata praveen
>Assignee: Andras Katona
>Priority: Major
>
> KafkaLog4jAppender exceptions are propagated to caller when Kafka is 
> down/slow/other, it may cause the application crash. Ideally appender should 
> print and ignore the exception
>  or should provide option to ignore/throw the exceptions like 
> 'ignoreExceptions' property of 
> https://logging.apache.org/log4j/2.x/manual/appenders.html#KafkaAppender



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7188) Avoid reverse DNS lookup in SASL channel builder

2018-07-20 Thread Rajini Sivaram (JIRA)
Rajini Sivaram created KAFKA-7188:
-

 Summary: Avoid reverse DNS lookup in SASL channel builder
 Key: KAFKA-7188
 URL: https://issues.apache.org/jira/browse/KAFKA-7188
 Project: Kafka
  Issue Type: Bug
  Components: network
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram


SaslChannelBuilder uses InetAddress.getHostName which may perform reverse DNS 
lookup, causing delays in some environments. We should replace these with 
SocketAddress.getHostString if possible.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7159) mark configuration files in confluent-kafka RPM SPEC file

2018-07-20 Thread Attila Sasvari (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16550735#comment-16550735
 ] 

Attila Sasvari commented on KAFKA-7159:
---

- Are you sure you filed the JIRA to the proper project? There is no code 
matching '%files' in apache/kafka - 
https://github.com/apache/kafka/search?q=%25files_q=%25files
- However, Apache Bigtop has some code for packaging kafka, but it was not 
updated for a while: 
https://github.com/apache/bigtop/blob/master/bigtop-packages/src/rpm/kafka/SPECS/kafka.spec
- {{confluent-kafka RPM SPEC}} indicates you might have wanted to report this 
to Confluent

> mark configuration files in confluent-kafka RPM SPEC file
> -
>
> Key: KAFKA-7159
> URL: https://issues.apache.org/jira/browse/KAFKA-7159
> Project: Kafka
>  Issue Type: Improvement
>  Components: packaging
>Affects Versions: 1.1.0
> Environment: RHEL7
>Reporter: Robert Fabisiak
>Priority: Trivial
>  Labels: rpm
>
> All configuration files in kafka RPM SPEC file should be marked with %config 
> prefix in %files section.
> This would prevent overwrites during install/upgrade and uninstall operations
> [https://access.redhat.com/documentation/en-us/red_hat_enterprise_linux/7/html-single/rpm_packaging_guide/index#files]
> It's especially important to save configuration during package upgrades.
> Section to change in SPEC file:
> {code:java}
> %files
> %config(noreplace) %{_sysconfdir}/kafka/*.conf
> %config(noreplace) %{_sysconfdir}/kafka/*.properties
> {code}
> It would also be good to mark documentation files with %doc



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7187) offsetsForTimes in MockConsumer implementation

2018-07-20 Thread Andras Katona (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7187?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andras Katona reassigned KAFKA-7187:


Assignee: Andras Katona

> offsetsForTimes in MockConsumer implementation
> --
>
> Key: KAFKA-7187
> URL: https://issues.apache.org/jira/browse/KAFKA-7187
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Reporter: Jing Chen
>Assignee: Andras Katona
>Priority: Minor
>
> The implementation for offsetsForTimes in MockConsumer is missing, it simply 
> throws UnsupportedOperationException, can anyone help to provide the 
> implementation of the method?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7082) Concurrent createTopics calls may throw NodeExistsException

2018-07-20 Thread williamguan (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7082?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16550587#comment-16550587
 ] 

williamguan commented on KAFKA-7082:


Hi, I think the broker should only accept the first create request when 
handling more than one request.

What if there are two people trying to create a topic with different 
partitions? One of them would found his messages were send to wrong partitions 
because both of them receive no exception while trying to create.

> Concurrent createTopics calls may throw NodeExistsException
> ---
>
> Key: KAFKA-7082
> URL: https://issues.apache.org/jira/browse/KAFKA-7082
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 1.1.0
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Critical
>  Labels: regression
> Fix For: 1.1.1, 2.0.0
>
>
> This exception is unexpected causing an `UnknownServerException` to be thrown 
> back to the client. Example below:
> {code}
> org.apache.zookeeper.KeeperException$NodeExistsException: KeeperErrorCode = 
> NodeExists for /config/topics/connect-configs
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:119)
> at 
> org.apache.zookeeper.KeeperException.create(KeeperException.java:51)
> at kafka.zookeeper.AsyncResponse.maybeThrow(ZooKeeperClient.scala:472)
> at kafka.zk.KafkaZkClient.createRecursive(KafkaZkClient.scala:1400)
> at kafka.zk.KafkaZkClient.create$1(KafkaZkClient.scala:262)
> at 
> kafka.zk.KafkaZkClient.setOrCreateEntityConfigs(KafkaZkClient.scala:269)
> at 
> kafka.zk.AdminZkClient.createOrUpdateTopicPartitionAssignmentPathInZK(AdminZkClient.scala:99)
> at kafka.server.AdminManager$$anonfun$2.apply(AdminManager.scala:126)
> at kafka.server.AdminManager$$anonfun$2.apply(AdminManager.scala:81)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:234)
> at scala.collection.Iterator$class.foreach(Iterator.scala:891)
> at scala.collection.AbstractIterator.foreach(Iterator.scala:1334)
> at scala.collection.IterableLike$class.foreach(IterableLike.scala:72)
> at scala.collection.AbstractIterable.foreach(Iterable.scala:54)
> at 
> scala.collection.TraversableLike$class.map(TraversableLike.scala:234)
> {code}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-7187) offsetsForTimes in MockConsumer implementation

2018-07-20 Thread Jing Chen (JIRA)
Jing Chen created KAFKA-7187:


 Summary: offsetsForTimes in MockConsumer implementation
 Key: KAFKA-7187
 URL: https://issues.apache.org/jira/browse/KAFKA-7187
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Reporter: Jing Chen


The implementation for offsetsForTimes in MockConsumer is missing, it simply 
throws UnsupportedOperationException, can anyone help to provide the 
implementation of the method?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7172) Prometheus /metrics http endpoint for monitoring integration

2018-07-20 Thread Hari Sekhon (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7172?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16550539#comment-16550539
 ] 

Hari Sekhon commented on KAFKA-7172:


That's a fair suggestion as a workaround for now although I believe in the long 
run most services will want to implement Prometheus monitoring support natively.

> Prometheus /metrics http endpoint for monitoring integration
> 
>
> Key: KAFKA-7172
> URL: https://issues.apache.org/jira/browse/KAFKA-7172
> Project: Kafka
>  Issue Type: New Feature
>  Components: metrics
>Affects Versions: 1.1.2
>Reporter: Hari Sekhon
>Priority: Major
>
> Feature Request to add Prometheus /metrics http endpoint for monitoring 
> integration:
> [https://prometheus.io/docs/prometheus/latest/configuration/configuration/#%3Cscrape_config%3E]
> Prometheus metrics format for that endpoint:
> [https://github.com/prometheus/docs/blob/master/content/docs/instrumenting/exposition_formats.md]
>  
> See also ticket for /jmx http endpoint similar to what Hadoop and HBase have 
> done for years in
> https://issues.apache.org/jira/browse/KAFKA-3377
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7185) getMatchingAcls throws StringIndexOutOfBoundsException for empty resource name

2018-07-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16550504#comment-16550504
 ] 

ASF GitHub Bot commented on KAFKA-7185:
---

dhruvilshah3 closed pull request #5400: KAFKA-7185: Allow empty resource name 
when matching ACLs
URL: https://github.com/apache/kafka/pull/5400
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala 
b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
index 55352584c26..e77656d748c 100644
--- a/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
+++ b/core/src/main/scala/kafka/security/auth/SimpleAclAuthorizer.scala
@@ -238,7 +238,7 @@ class SimpleAclAuthorizer extends Authorizer with Logging {
 
   val prefixed = aclCache.range(
 Resource(resourceType, resourceName, PatternType.PREFIXED),
-Resource(resourceType, resourceName.substring(0, 1), 
PatternType.PREFIXED)
+Resource(resourceType, resourceName.take(1), PatternType.PREFIXED)
   )
 .filterKeys(resource => resourceName.startsWith(resource.name))
 .flatMap { case (resource, versionedAcls) => versionedAcls.acls }
diff --git 
a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala 
b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
index 5b65a7f2586..4320e3e125d 100644
--- a/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
+++ b/core/src/test/scala/unit/kafka/security/auth/SimpleAclAuthorizerTest.scala
@@ -92,6 +92,13 @@ class SimpleAclAuthorizerTest extends ZooKeeperTestHarness {
 simpleAclAuthorizer.authorize(session, Read, Resource(Topic, "something", 
PREFIXED))
   }
 
+  @Test
+  def testAuthorizeWithEmptyResourceName(): Unit = {
+assertFalse(simpleAclAuthorizer.authorize(session, Read, Resource(Group, 
"", LITERAL)))
+simpleAclAuthorizer.addAcls(Set[Acl](allowReadAcl), Resource(Group, 
WildCardResource, LITERAL))
+assertTrue(simpleAclAuthorizer.authorize(session, Read, Resource(Group, 
"", LITERAL)))
+  }
+
   @Test
   def testTopicAcl() {
 val user1 = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, username)


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> getMatchingAcls throws StringIndexOutOfBoundsException for empty resource name
> --
>
> Key: KAFKA-7185
> URL: https://issues.apache.org/jira/browse/KAFKA-7185
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Blocker
>
> KIP-290 introduced a way to match ACLs based on prefix. Certain resource 
> names like that for group id can be empty strings. When an empty string is 
> passed into `getMatchingAcls`, it would throw a 
> `StringIndexOutOfBoundsException` because of the following logic:
> {noformat}
> val prefixed = aclCache.range(
>  Resource(resourceType, resourceName, PatternType.PREFIXED),
>  Resource(resourceType, resourceName.substring(0, Math.min(1, 
> resourceName.length)), PatternType.PREFIXED)
>  )
>  .filterKeys(resource => resourceName.startsWith(resource.name))
>  .flatMap { case (resource, versionedAcls) => versionedAcls.acls }
>  .toSet{noformat}
> This is a regression introduced in 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7185) getMatchingAcls throws StringIndexOutOfBoundsException for empty resource name

2018-07-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7185?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16550505#comment-16550505
 ] 

ASF GitHub Bot commented on KAFKA-7185:
---

dhruvilshah3 opened a new pull request #5400: KAFKA-7185: Allow empty resource 
name when matching ACLs
URL: https://github.com/apache/kafka/pull/5400
 
 
   Please let me know if there are more test cases we should add.


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> getMatchingAcls throws StringIndexOutOfBoundsException for empty resource name
> --
>
> Key: KAFKA-7185
> URL: https://issues.apache.org/jira/browse/KAFKA-7185
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.0.0
>Reporter: Dhruvil Shah
>Assignee: Dhruvil Shah
>Priority: Blocker
>
> KIP-290 introduced a way to match ACLs based on prefix. Certain resource 
> names like that for group id can be empty strings. When an empty string is 
> passed into `getMatchingAcls`, it would throw a 
> `StringIndexOutOfBoundsException` because of the following logic:
> {noformat}
> val prefixed = aclCache.range(
>  Resource(resourceType, resourceName, PatternType.PREFIXED),
>  Resource(resourceType, resourceName.substring(0, Math.min(1, 
> resourceName.length)), PatternType.PREFIXED)
>  )
>  .filterKeys(resource => resourceName.startsWith(resource.name))
>  .flatMap { case (resource, versionedAcls) => versionedAcls.acls }
>  .toSet{noformat}
> This is a regression introduced in 2.0.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-3702) SslTransportLayer.close() does not shutdown gracefully

2018-07-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-3702?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16550484#comment-16550484
 ] 

ASF GitHub Bot commented on KAFKA-3702:
---

rajinisivaram closed pull request #5397: KAFKA-3702: Change log level of SSL 
close_notify failure
URL: https://github.com/apache/kafka/pull/5397
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
index 838a6a75af3..08a39e71d50 100644
--- 
a/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
+++ 
b/clients/src/main/java/org/apache/kafka/common/network/SslTransportLayer.java
@@ -177,7 +177,7 @@ public void close() throws IOException {
 flush(netWriteBuffer);
 }
 } catch (IOException ie) {
-log.warn("Failed to send SSL Close message", ie);
+log.debug("Failed to send SSL Close message", ie);
 } finally {
 socketChannel.socket().close();
 socketChannel.close();


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> SslTransportLayer.close() does not shutdown gracefully
> --
>
> Key: KAFKA-3702
> URL: https://issues.apache.org/jira/browse/KAFKA-3702
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 0.10.0.0
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
>
> The warning "Failed to send SSL Close message" occurs very frequently when 
> SSL connections are closed. Close should write outbound data and shutdown 
> gracefully.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-6123) Give client MetricsReporter auto-generated client.id

2018-07-20 Thread Kevin Lu (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-6123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16550350#comment-16550350
 ] 

Kevin Lu commented on KAFKA-6123:
-

Created 
[KIP-344|https://cwiki.apache.org/confluence/display/KAFKA/KIP-344%3A+The+auto-generated+client+id+should+be+passed+to+MetricsReporter].

> Give client MetricsReporter auto-generated client.id
> 
>
> Key: KAFKA-6123
> URL: https://issues.apache.org/jira/browse/KAFKA-6123
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, metrics
>Reporter: Kevin Lu
>Assignee: Kevin Lu
>Priority: Minor
>  Labels: clients, metrics, newbie++
>
> KAFKA-4756 bugfix resolved the broker's KafkaMetricsReporter missing auto 
> generated broker ids, but this was not fixed on the client side.
>  
> Metric reporters configured for clients should also be given the 
> auto-generated client id in the `configure` method.
> The interceptors already receive the auto-generated client id.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7098) Improve accuracy of the log cleaner throttle rate

2018-07-20 Thread Dong Lin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7098?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Dong Lin resolved KAFKA-7098.
-
Resolution: Fixed

> Improve accuracy of the log cleaner throttle rate
> -
>
> Key: KAFKA-7098
> URL: https://issues.apache.org/jira/browse/KAFKA-7098
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
>
> LogCleaner uses the Throttler class to throttler the log cleaning rate to the 
> user-specified limit, i.e. log.cleaner.io.max.bytes.per.second. However, in 
> Throttler.maybeThrottle(), the periodStartNs is set to the time before the 
> sleep after the sleep() is called, which artificially increase the actual 
> window size and under-estimate the actual log cleaning rate. This causes the 
> log cleaning IO to be higher than the user-specified limit.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7098) Improve accuracy of the log cleaner throttle rate

2018-07-20 Thread ASF GitHub Bot (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7098?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16550300#comment-16550300
 ] 

ASF GitHub Bot commented on KAFKA-7098:
---

lindong28 closed pull request #5350: KAFKA-7098: Improve accuracy of throttling 
by avoiding under-estimating actual rate in Throttler
URL: https://github.com/apache/kafka/pull/5350
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/core/src/main/scala/kafka/utils/Throttler.scala 
b/core/src/main/scala/kafka/utils/Throttler.scala
index e781cd6f767..9fe3cdcf13f 100644
--- a/core/src/main/scala/kafka/utils/Throttler.scala
+++ b/core/src/main/scala/kafka/utils/Throttler.scala
@@ -73,7 +73,7 @@ class Throttler(desiredRatePerSec: Double,
 time.sleep(sleepTime)
   }
 }
-periodStartNs = now
+periodStartNs = time.nanoseconds()
 observedSoFar = 0
   }
 }
diff --git a/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala 
b/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala
new file mode 100755
index 000..d26e791ddf9
--- /dev/null
+++ b/core/src/test/scala/unit/kafka/utils/ThrottlerTest.scala
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package unit.kafka.utils
+
+import kafka.utils.Throttler
+import org.apache.kafka.common.utils.MockTime
+import org.junit.Test
+import org.junit.Assert.{assertTrue, assertEquals}
+
+
+class ThrottlerTest {
+  @Test
+  def testThrottleDesiredRate() {
+val throttleCheckIntervalMs = 100
+val desiredCountPerSec = 1000.0
+val desiredCountPerInterval = desiredCountPerSec * throttleCheckIntervalMs 
/ 1000.0
+
+val mockTime = new MockTime()
+val throttler = new Throttler(desiredRatePerSec = desiredCountPerSec,
+  checkIntervalMs = throttleCheckIntervalMs,
+  time = mockTime)
+
+// Observe desiredCountPerInterval at t1
+val t1 = mockTime.milliseconds()
+throttler.maybeThrottle(desiredCountPerInterval)
+assertEquals(t1, mockTime.milliseconds())
+
+// Observe desiredCountPerInterval at t1 + throttleCheckIntervalMs + 1,
+mockTime.sleep(throttleCheckIntervalMs + 1)
+throttler.maybeThrottle(desiredCountPerInterval)
+val t2 = mockTime.milliseconds()
+assertTrue(t2 >= t1 + 2 * throttleCheckIntervalMs)
+
+// Observe desiredCountPerInterval at t2
+throttler.maybeThrottle(desiredCountPerInterval)
+assertEquals(t2, mockTime.milliseconds())
+
+// Observe desiredCountPerInterval at t2 + throttleCheckIntervalMs + 1
+mockTime.sleep(throttleCheckIntervalMs + 1)
+throttler.maybeThrottle(desiredCountPerInterval)
+val t3 = mockTime.milliseconds()
+assertTrue(t3 >= t2 + 2 * throttleCheckIntervalMs)
+
+val elapsedTimeMs = t3 - t1
+val actualCountPerSec = 4 * desiredCountPerInterval * 1000 / elapsedTimeMs
+assertTrue(actualCountPerSec <= desiredCountPerSec)
+  }
+}


 


This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve accuracy of the log cleaner throttle rate
> -
>
> Key: KAFKA-7098
> URL: https://issues.apache.org/jira/browse/KAFKA-7098
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Dong Lin
>Assignee: Zhanxiang (Patrick) Huang
>Priority: Major
>
> LogCleaner uses the Throttler class to throttler the log cleaning rate to the 
> user-specified limit, i.e. log.cleaner.io.max.bytes.per.second. However, in 
> Throttler.maybeThrottle(), the periodStartNs is set to the time before the 
> sleep after the sleep() is