Re: [DISCUSS] KIP-459: Improve KafkaStreams#close

2019-05-19 Thread Dongjin Lee
Hi Matthias,

I investigated the inconsistencies between `close` semantics of `Producer`,
`Consumer`, and `AdminClient`. And I found that this inconsistency was
intended. Here are the details:

The current `KafkaConsumer#close`'s default timeout, 30 seconds, was
introduced in KIP-102 (0.10.2.0)[^1]. According to the document, there are
two differences between `Consumer` and `Producer`;

1. `Consumer`s don't have large requests.
2. `Consumer#close` is affected by consumer coordinator, whose close
operation is affected by `request.timeout.ms`.

By the above reasons, Consumer's default timeout was set a little bit
different.[^3] (It is done by Rajini.)

At the initial proposal, I proposed to change the default timeout value of
`[Producer, AdminClient]#close` from `Long.MAX_VALUE` into another one;
However, since it is now clear that the current implementation is totally
reasonable, *it seems like changing the approach into just providing a
close timeout into the clients used by KafkaStreams is a more suitable
one.*[^4]
This approach has the following advantages:

1. The problem described in KAFKA-7996 now resolved, since Producer doesn't
hang up while its `close` operation.
2. We don't have to change the semantics of `Producer#close`,
`AdminClient#close` nor `KafkaStreams#close`. As you pointed out, these
kinds of changes are hard for users to reason about.

How do you think?

Thanks,
Dongjin

[^1]:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-102+-+Add+close+with+timeout+for+consumers
[^2]: "The existing close() method without a timeout will attempt to close
the consumer gracefully with a default timeout of 30 seconds. This is
different from the producer default of Long.MAX_VALUE since consumers don't
have large requests."
[^3]: 'Rejected Alternatives' section explains it.
[^4]: In the case of Streams reset tool, `KafkaAdminClient`'s close timeout
is 60 seconds (KIP-198): https://github.com/apache/kafka/pull/3927/files

On Fri, Apr 26, 2019 at 5:16 PM Matthias J. Sax 
wrote:

> Thanks for the KIP.
>
> Overall, I agree with the sentiment of the KIP. The current semantics of
> `KafkaStreams#close(timeout)` are not well defined. Also the general
> client inconsistencies are annoying.
>
>
> > This KIP make any change on public interfaces; however, it makes a
> subtle change to the existing API's semantics. If this KIP is accepted,
> documenting these semantics with as much detail as possible may much better.
>
> I am not sure if I would call this change "subtle". It might actually be
> rather big impact and hence I am also wondering about backward
> compatibility (details below). Overall, I am not sure if documenting the
> change would be sufficient.
>
>
>
> > Change the default close timeout of Producer, AdminClient into more
> reasonable one, not Long.MAX_VALUE.
>
> Can you be more specific than "more reasonable", and propose a concrete
> value? What about backward compatibility? Assume an application wants to
> block forever by default: with this change, it's required to rewrite
> code to keep the intended semantics. Hence, the change does not seems to
> be backward compatible. Also note, that a config change would not be
> sufficient, but an actual code change would be required.
>
> Also, why not go the other direction and default `KafkaConsumer#close()`
> to use Long.MAX_VALUE, too? Note that current KafkaStreams#close() also
> uses Long.MAX_VALUE (ie, over all 4 clients, it's 50:50). Of course, a
> similar backward compatibility concern raises.
>
> Making close() blocking by default seems not un-reasonable per-se. Can
> you elaborate why Long.MAX_VALUE is "bad" compared to eg, 30 seconds?
>
>
> > If succeeded, simply return; if not, close remaining resources with
> default close timeout.
>
> Why do you want to apply the default timeout as fallback? This would
> violate the user intention, too, and thus, might result in a situation
> that is not much better than the current one.
>
>
> For KafkaStreams, if there are multiple StreamThreads, would the full
> timeout be passed into each thread as all of them could shut down in
> parallel? Or would the timeout be divided over all threads?
>
> What about the case when there is a different number of client? For
> example, with EOS enabled, there are multiple Producers that need to be
> closed, however, the user might not even be aware of the increased
> number of producers (or not know how many there actually are).
>
>
> It seems to be hard for users to reason about those dependencies.
>
>
> -Matthias
>
>
> On 4/23/19 6:13 PM, Dongjin Lee wrote:
> > Hi dev,
> >
> > I would like to start the discussion of KIP-459: Improve
> KafkaStreams#close
> > <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close
> >.
> > This proposal is originated from the issue reported via community slack,
> > KAFKA-7996 . In short,
> > this KIP proposes to resolve this problem by improving exi

[jira] [Created] (KAFKA-8391) Flaky Test RebalanceSourceConnectorsIntegrationTest#testDeleteConnector

2019-05-19 Thread Matthias J. Sax (JIRA)
Matthias J. Sax created KAFKA-8391:
--

 Summary: Flaky Test 
RebalanceSourceConnectorsIntegrationTest#testDeleteConnector
 Key: KAFKA-8391
 URL: https://issues.apache.org/jira/browse/KAFKA-8391
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.3.0
Reporter: Matthias J. Sax
 Fix For: 2.3.0


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/4747/testReport/junit/org.apache.kafka.connect.integration/RebalanceSourceConnectorsIntegrationTest/testDeleteConnector/]
{quote}java.lang.AssertionError: Condition not met within timeout 3. 
Connector tasks did not stop in time. at 
org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:375) at 
org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:352) at 
org.apache.kafka.connect.integration.RebalanceSourceConnectorsIntegrationTest.testDeleteConnector(RebalanceSourceConnectorsIntegrationTest.java:166){quote}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


Re: [VOTE] KIP-440: Extend Connect Converter to support headers

2019-05-19 Thread Dongjin Lee
+1 (non-binding).

Binding: +2 (Randall, Gwen)
Non-binding: +2 (Andrew, Dongjin)

We need one more +1 from the committers. Is there anyone else?

Thanks,
Dongjin

On Fri, May 10, 2019 at 12:16 AM Andrew Schofield 
wrote:

> +1 (non-binding).
>
> Looks good.
>
> On 09/05/2019, 15:55, "Gwen Shapira"  wrote:
>
> +1 (binding)
> Thank you!
>
> On Mon, May 6, 2019, 11:25 PM Yaroslav Tkachenko 
> wrote:
>
> > Hi everyone,
> >
> > I'd like to start a vote for KIP-440: Extend Connect Converter to
> support
> > headers (
> >
> >
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-440%253A%2BExtend%2BConnect%2BConverter%2Bto%2Bsupport%2Bheaders&data=02%7C01%7C%7C82864a9a5f3049e8ca1d08d6d48e5147%7C84df9e7fe9f640afb435%7C1%7C0%7C636930105039335723&sdata=FyQT5pfTwtT2NTMStgSl6zRjiaWFVJqG3%2B4vt5nRYmI%3D&reserved=0
> > )
> >
> > Discussion:
> >
> >
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Flists.apache.org%2Fthread.html%2F1fc1e3d2cddd8311d3db7c98f0d09a1a137ca4b20d1f3c8ab203a855%40%253Cdev.kafka.apache.org%253E&data=02%7C01%7C%7C82864a9a5f3049e8ca1d08d6d48e5147%7C84df9e7fe9f640afb435%7C1%7C0%7C636930105039335723&sdata=B2a4Mx9ScWaO3HEEw0LoIRX0ajETAcwUmDxt5Ir5FIs%3D&reserved=0
> >
> > Thanks!
> >
>
> --
> *Dongjin Lee*
>
> *A hitchhiker in the mathematical world.*
> *github:  github.com/dongjinleekr
> linkedin: kr.linkedin.com/in/dongjinleekr
> speakerdeck: speakerdeck.com/dongjin
> *
>


Re: [VOTE] KIP-440: Extend Connect Converter to support headers

2019-05-19 Thread Kamal Chandraprakash
+1 (non-binding). Thanks for the KIP!

On Sun, May 19, 2019 at 6:36 PM Dongjin Lee  wrote:

> +1 (non-binding).
>
> Binding: +2 (Randall, Gwen)
> Non-binding: +2 (Andrew, Dongjin)
>
> We need one more +1 from the committers. Is there anyone else?
>
> Thanks,
> Dongjin
>
> On Fri, May 10, 2019 at 12:16 AM Andrew Schofield <
> andrew_schofi...@live.com>
> wrote:
>
> > +1 (non-binding).
> >
> > Looks good.
> >
> > On 09/05/2019, 15:55, "Gwen Shapira"  wrote:
> >
> > +1 (binding)
> > Thank you!
> >
> > On Mon, May 6, 2019, 11:25 PM Yaroslav Tkachenko  >
> > wrote:
> >
> > > Hi everyone,
> > >
> > > I'd like to start a vote for KIP-440: Extend Connect Converter to
> > support
> > > headers (
> > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Fcwiki.apache.org%2Fconfluence%2Fdisplay%2FKAFKA%2FKIP-440%253A%2BExtend%2BConnect%2BConverter%2Bto%2Bsupport%2Bheaders&data=02%7C01%7C%7C82864a9a5f3049e8ca1d08d6d48e5147%7C84df9e7fe9f640afb435%7C1%7C0%7C636930105039335723&sdata=FyQT5pfTwtT2NTMStgSl6zRjiaWFVJqG3%2B4vt5nRYmI%3D&reserved=0
> > > )
> > >
> > > Discussion:
> > >
> > >
> >
> https://nam04.safelinks.protection.outlook.com/?url=https%3A%2F%2Flists.apache.org%2Fthread.html%2F1fc1e3d2cddd8311d3db7c98f0d09a1a137ca4b20d1f3c8ab203a855%40%253Cdev.kafka.apache.org%253E&data=02%7C01%7C%7C82864a9a5f3049e8ca1d08d6d48e5147%7C84df9e7fe9f640afb435%7C1%7C0%7C636930105039335723&sdata=B2a4Mx9ScWaO3HEEw0LoIRX0ajETAcwUmDxt5Ir5FIs%3D&reserved=0
> > >
> > > Thanks!
> > >
> >
> > --
> > *Dongjin Lee*
> >
> > *A hitchhiker in the mathematical world.*
> > *github:  github.com/dongjinleekr
> > linkedin:
> kr.linkedin.com/in/dongjinleekr
> > speakerdeck:
> speakerdeck.com/dongjin
> > *
> >
>


[jira] [Created] (KAFKA-8392) Kafka broker leaks metric when partition leader moves to another node.

2019-05-19 Thread Kamal Chandraprakash (JIRA)
Kamal Chandraprakash created KAFKA-8392:
---

 Summary: Kafka broker leaks metric when partition leader moves to 
another node.
 Key: KAFKA-8392
 URL: https://issues.apache.org/jira/browse/KAFKA-8392
 Project: Kafka
  Issue Type: Bug
  Components: metrics
Affects Versions: 2.2.0
Reporter: Kamal Chandraprakash


When a partition leader moves from one node to another due to an imbalance in 
leader.imbalance.per.broker.percentage, the old leader broker still emits the 
static metric value.

Steps to reproduce:
1. Create a cluster with 3 nodes.
2. Create a topic with 2 partitions and RF=3
3. Generate some data using the console producer.
4. Move any one of the partition from one node to another using 
reassign-partitions and preferred-replica-election script.
5. Generate some data using the console producer.
6. Now all the 3 nodes emit bytesIn, bytesOut and MessagesIn for that topic.

Is it the expected behavior?



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)