Re: [VOTE] KIP-441: Smooth Scaling Out for Kafka Streams

2019-09-11 Thread Vinoth Chandar
+1 (non-binding).

On Fri, Sep 6, 2019 at 12:46 AM Bruno Cadonna  wrote:

> +1 (non-binding)
>
> On Fri, Sep 6, 2019 at 12:32 AM Guozhang Wang  wrote:
> >
> > +1 (binding).
> >
> > On Thu, Sep 5, 2019 at 2:47 PM John Roesler  wrote:
> >
> > > Hello, all,
> > >
> > > After a great discussion, I'd like to open voting on KIP-441,
> > > to avoid long restore times in Streams after rebalancing.
> > > Please cast your votes!
> > >
> > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-441:+Smooth+Scaling+Out+for+Kafka+Streams
> > >
> > > Thanks,
> > > -John
> > >
> >
> >
> > --
> > -- Guozhang
>


Re: _consumer_offsets is becoming rather big. How to purge?

2019-09-11 Thread Colin McCabe
Hi Ash,

At first guess, you probably had a problem with your log cleaner thread, which 
resulted in the offsets log not being cleaned.  Check if that thread is running.

best,
Colin

On Wed, Sep 11, 2019, at 09:52, Ash G wrote:
> Bump, no reply,
> 
> It seems this condition was missed by devs when this feature was 
> designed and implemented.
> 
> On 2019/09/06 14:45:47, Ash G  wrote: 
> > 
> > _consumer_offsets is becoming rather big > 1 TB. Is there a way to purge 
> > dead/inactive consumer id rows from it?
> > I am assuming dead/inactive consumer id  rows is the reason. could there be 
> > another reason?
> > 
> > [reposting from user list]
> > 
>


KIP-521: Enable redirection of Connect's log4j messages to a file by default

2019-09-11 Thread Konstantine Karantasis
Hi all.

While we are in the midst of some very interesting KIP discussions, I'd
like to bring a brief and useful KIP on the table as well.

It's about enabling redirection of log4j logging to a file for Kafka
Connect by default, in a way similar to how this is done for Kafka brokers
today.

You might find it short and straightforward but, still, needs to be
discussed as a KIP since it's an externally visible (yet compatible) change
in how Connect logs its status during runtime.

Here's a link to the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-521%3A+Enable+redirection+of+Connect%27s+log4j+messages+to+a+file+by+default

Looking forward to your comments!
Konstantine


[jira] [Resolved] (KAFKA-8875) CreateTopic API should check topic existence before replication factor

2019-09-11 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8875?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-8875.

Fix Version/s: 2.4.0
   Resolution: Fixed

> CreateTopic API should check topic existence before replication factor
> --
>
> Key: KAFKA-8875
> URL: https://issues.apache.org/jira/browse/KAFKA-8875
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: huxihx
>Priority: Major
> Fix For: 2.4.0
>
>
> If you try to create a topic and the replication factor cannot be satisfied, 
> Kafka will return `INVALID_REPLICATION_FACTOR`. If the topic already exists, 
> we should probably return `TOPIC_ALREADY_EXISTS` instead. You won't see this 
> problem if using TopicCommand because we check existence prior to creating 
> the topic.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Build failed in Jenkins: kafka-trunk-jdk11 #808

2019-09-11 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-8817: Remove timeout for the whole test (#7313)

[github] MINOR: Add api version to uncaught exception message (#7311)

[cmccabe] KAFKA-8345 (KIP-455): Controller and KafkaApi changes (part 3/4) 
(#7128)

--
[...truncated 702.06 KB...]

kafka.server.KafkaApisTest > 
shouldRespondWithUnsupportedForMessageFormatOnHandleWriteTxnMarkersWhenMagicLowerThanRequired
 STARTED

kafka.server.KafkaApisTest > 
shouldRespondWithUnsupportedForMessageFormatOnHandleWriteTxnMarkersWhenMagicLowerThanRequired
 PASSED

kafka.server.KafkaApisTest > 
shouldThrowUnsupportedVersionExceptionOnHandleTxnOffsetCommitRequestWhenInterBrokerProtocolNotSupported
 STARTED

kafka.server.KafkaApisTest > 
shouldThrowUnsupportedVersionExceptionOnHandleTxnOffsetCommitRequestWhenInterBrokerProtocolNotSupported
 PASSED

kafka.server.KafkaApisTest > testOffsetCommitWithInvalidPartition STARTED

kafka.server.KafkaApisTest > testOffsetCommitWithInvalidPartition PASSED

kafka.server.KafkaApisTest > 
testLeaderReplicaIfLocalRaisesNotLeaderForPartition STARTED

kafka.server.KafkaApisTest > 
testLeaderReplicaIfLocalRaisesNotLeaderForPartition PASSED

kafka.server.KafkaApisTest > testJoinGroupProtocolsOrder STARTED

kafka.server.KafkaApisTest > testJoinGroupProtocolsOrder PASSED

kafka.server.KafkaApisTest > 
shouldThrowUnsupportedVersionExceptionOnHandleAddPartitionsToTxnRequestWhenInterBrokerProtocolNotSupported
 STARTED

kafka.server.KafkaApisTest > 
shouldThrowUnsupportedVersionExceptionOnHandleAddPartitionsToTxnRequestWhenInterBrokerProtocolNotSupported
 PASSED

kafka.server.KafkaApisTest > testReadUncommittedConsumerListOffsetLatest STARTED

kafka.server.KafkaApisTest > testReadUncommittedConsumerListOffsetLatest PASSED

kafka.server.KafkaApisTest > 
testMetadataRequestOnDistinctListenerWithInconsistentListenersAcrossBrokers 
STARTED

kafka.server.KafkaApisTest > 
testMetadataRequestOnDistinctListenerWithInconsistentListenersAcrossBrokers 
PASSED

kafka.server.KafkaApisTest > 
shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion STARTED

kafka.server.KafkaApisTest > 
shouldAppendToLogOnWriteTxnMarkersWhenCorrectMagicVersion PASSED

kafka.server.KafkaApisTest > 
shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported
 STARTED

kafka.server.KafkaApisTest > 
shouldThrowUnsupportedVersionExceptionOnHandleWriteTxnMarkersRequestWhenInterBrokerProtocolNotSupported
 PASSED

kafka.server.KafkaApisTest > testLeaderReplicaIfLocalRaisesFencedLeaderEpoch 
STARTED

kafka.server.KafkaApisTest > testLeaderReplicaIfLocalRaisesFencedLeaderEpoch 
PASSED

kafka.server.KafkaApisTest > testFetchRequestV9WithNoLogConfig STARTED

kafka.server.KafkaApisTest > testFetchRequestV9WithNoLogConfig PASSED

kafka.server.KafkaApisTest > 
shouldRespondWithUnknownTopicWhenPartitionIsNotHosted STARTED

kafka.server.KafkaApisTest > 
shouldRespondWithUnknownTopicWhenPartitionIsNotHosted PASSED

kafka.server.KafkaApisTest > 
rejectSyncGroupRequestWhenStaticMembershipNotSupported STARTED

kafka.server.KafkaApisTest > 
rejectSyncGroupRequestWhenStaticMembershipNotSupported PASSED

kafka.server.KafkaApisTest > 
rejectHeartbeatRequestWhenStaticMembershipNotSupported STARTED

kafka.server.KafkaApisTest > 
rejectHeartbeatRequestWhenStaticMembershipNotSupported PASSED

kafka.server.KafkaApisTest > testReadCommittedConsumerListOffsetLatest STARTED

kafka.server.KafkaApisTest > testReadCommittedConsumerListOffsetLatest PASSED

kafka.server.KafkaApisTest > 
testMetadataRequestOnSharedListenerWithInconsistentListenersAcrossBrokers 
STARTED

kafka.server.KafkaApisTest > 
testMetadataRequestOnSharedListenerWithInconsistentListenersAcrossBrokers PASSED

kafka.server.KafkaApisTest > testAddPartitionsToTxnWithInvalidPartition STARTED

kafka.server.KafkaApisTest > testAddPartitionsToTxnWithInvalidPartition PASSED

kafka.server.KafkaApisTest > 
testLeaderReplicaIfLocalRaisesUnknownTopicOrPartition STARTED

kafka.server.KafkaApisTest > 
testLeaderReplicaIfLocalRaisesUnknownTopicOrPartition PASSED

kafka.server.KafkaApisTest > 
shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported
 STARTED

kafka.server.KafkaApisTest > 
shouldThrowUnsupportedVersionExceptionOnHandleAddOffsetToTxnRequestWhenInterBrokerProtocolNotSupported
 PASSED

kafka.server.KafkaApisTest > testLeaderReplicaIfLocalRaisesUnknownLeaderEpoch 
STARTED

kafka.server.KafkaApisTest > testLeaderReplicaIfLocalRaisesUnknownLeaderEpoch 
PASSED

kafka.server.KafkaApisTest > testTxnOffsetCommitWithInvalidPartition STARTED

kafka.server.KafkaApisTest > testTxnOffsetCommitWithInvalidPartition PASSED

kafka.server.KafkaApisTest > testSingleLeaveGroup STARTED

kafka.server.KafkaApisTest > testSingleLeaveGroup PASSED

kafka.server.KafkaApisTest > 
rejectJoinGroupRequestWhenStaticMembershipNotSupported 

[jira] [Resolved] (KAFKA-8886) Make Authorizer create/delete methods asynchronous

2019-09-11 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8886?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-8886.
---
  Reviewer: Manikumar
Resolution: Fixed

> Make Authorizer create/delete methods asynchronous
> --
>
> Key: KAFKA-8886
> URL: https://issues.apache.org/jira/browse/KAFKA-8886
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.4.0
>
>
> As discussed on the mailing list, createAcls and deleteAcls should be 
> asynchronous to avoid blocking request threads when updates are made to 
> non-ZK based stores which may block for potentially long durations.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] KIP-519: Make SSL context/engine configuration extensible

2019-09-11 Thread Rajini Sivaram
Kafka already has the notion of custom configs. And we support
reconfigurable custom configs for some interfaces e.g. MetricsReporter. We
also recently added custom reconfigurable configs for Authorizer under
KIP-504.

The issue with custom configs for SSL is described in
https://issues.apache.org/jira/browse/KAFKA-7588. We currently don't pass
in custom configs to ChannelBuilders. We need to fix this, not just for SSL
but for other security plugins as well. So it needs to be a generic
solution, not specific to KIP-519.

Once KAFKA-7588 is fixed, the existing dynamic reconfiguration mechanism in
brokers would simply work. Dynamic configs works exactly in the same way
for custom configs as it does for other configs. The list of reconfigurable
configs is returned by the implementation class and the class gets notified
when any of those configs changes. This includes validateReconfiguration()
as well the actual reconfigure().

For SSL alone, we have special handling of dynamic configs to enable
reloading of keystores/truststores when the file changes, even though none
of the config values have changed. Reconfiguration is triggered by an admin
client request to alter configs. In this case, none of the actual configs
may have changed, but we check if the file has changed. This is currently
done only for the standard keystore/truststore configs. With KIP-519, I
guess we want the custom SslEngineFactory to be able to decide whether
reconfiguration is required. The simplest way to achieve this would be to
have a custom config that is updated when reconfiguration is required. I am
not sure we need a separate mechanism to trigger reconfiguration that
doesn't rely on admin clients since admin clients provide useful logging
and auditability.

Regards,

Rajini

On Wed, Sep 11, 2019 at 4:13 PM Pellerin, Clement 
wrote:

> I'm sorry if I divert the discussion, but without this issue, it would
> have been pretty trivial to update KIP-383 to go as far as you did. I am
> also happy to get a discussion going, the KIP-383 thread was a desolate
> place.
>
> Kafka needs to know about custom configs because it validates the configs
> before it passes them to (re)configure. Unknown configs are silently
> removed by ConfigDef. We could keep unknown configs as strings without
> validating them in ConfigDef, but I don't know if the Kafka community would
> accept this weaker validation.
>
> It appears we are trying to invent the notion of a meta config. Anyway, I
> think we have shown asking an instance of SslEngineFactory to contribute to
> ConfigDef is way too late.
>
> For your push notification, would it be simpler to just let your
> SslEngineFactory notice the change and make it effective the next time it
> is called. SslFactory would not cache the SslEngine and always ask
> SslEngineFactory for it. You don't even need an inner thread if
> SslEngineFactory checks for a change when it is called. SslEngineFactory
> would no longer be immutable, so maybe it is worth reconsidering how
> reconfigure works for it.
>
> -Original Message-
> From: Maulin Vasavada [mailto:maulin.vasav...@gmail.com]
> Sent: Wednesday, September 11, 2019 3:29 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-519: Make SSL context/engine configuration
> extensible
>
> Hi all,
>
> Since the "custom config" seems the main topic of interest let us talk
> about it.
>
> 1. I want to confirm that I interpret the definition of 'custom config of
> SslEngineFactory' the same way Clement is suggesting - "a config that does
> not exist in Kafka but is specified by a custom implementation of
> SslEngineFactory".  If there is a disagreement to that we have to bring it
> up here sooner.
>
> 2. I've been thinking about it and I question why we are trying to make a
> custom config a first class citizen in standard config?
> The reasoning for that question is-
> Kafka wants to delegate creation of SSLEngine to a class which is "not"
> part of Kafka's distribution. Since the interface for SSLEngine creator
> will be defined by the public method of createSSLEngine(), why would Kafka
> care what does the implementation do other than fulfilling the contract of
> creation of SSLEngine. The implementation can use any special configs i.e.
> configs coming from input Map OR configs defined in a new file only known
> to itself. Making the configs part of the interface contract in any way is
> not necessary. This way we keep it simple and straightforward.
>
> 3. Now, 2nd point raises a question - if we follow that suggestion - how
> can we ever re-create the SSLEngineFactory object and allow new object to
> be created when something changes in the implementation. That is a valid
> question. If you noticed in the KIP section titled "Other challenge" - we
> do have scenario where the SslEngineFactory implementation ONLY knows that
> something changed - example: keystore got updated by a local daemon process
> only known to the specific implementation. This means we 

RE: [DISCUSS] KIP-519: Make SSL context/engine configuration extensible

2019-09-11 Thread Pellerin, Clement
Indeed, this is a general problem requiring a more general solution than 
KIP-519. I'm glad there was work done on this already.

So config.originals() still contains unknown configs but nothing has been 
validated and cast to the proper type.
How does validation work for an extension point that receives 
config.originals()? Is there a public validator helper to handle this?
Do we need to create ConfigKeys in the ConfigDef for custom configs only known 
to a custom SslEngineFactory implementation?
Do we need to declare the standard SSL configs in ConfigDef if SslFactory needs 
to revalidate them anyway because it receives config.originals()?
I assume there is such a thing as config.originals() also for a reconfigure()?

If we implement KIP-519 and later change from config.values() to 
config.originals(), this will affect the contract for the constructor of the 
SslEngineFactory. We might need to add custom configs support to KIP-519 or 
delay KIP-519 until the change to config.originals().


-Original Message-
From: Rajini Sivaram [mailto:rajinisiva...@gmail.com] 
Sent: Wednesday, September 11, 2019 4:25 PM
To: dev
Subject: Re: [DISCUSS] KIP-519: Make SSL context/engine configuration extensible

Kafka already has the notion of custom configs. And we support
reconfigurable custom configs for some interfaces e.g. MetricsReporter. We
also recently added custom reconfigurable configs for Authorizer under
KIP-504.

The issue with custom configs for SSL is described in
https://issues.apache.org/jira/browse/KAFKA-7588. We currently don't pass
in custom configs to ChannelBuilders. We need to fix this, not just for SSL
but for other security plugins as well. So it needs to be a generic
solution, not specific to KIP-519.

Once KAFKA-7588 is fixed, the existing dynamic reconfiguration mechanism in
brokers would simply work. Dynamic configs works exactly in the same way
for custom configs as it does for other configs. The list of reconfigurable
configs is returned by the implementation class and the class gets notified
when any of those configs changes. This includes validateReconfiguration()
as well the actual reconfigure().

For SSL alone, we have special handling of dynamic configs to enable
reloading of keystores/truststores when the file changes, even though none
of the config values have changed. Reconfiguration is triggered by an admin
client request to alter configs. In this case, none of the actual configs
may have changed, but we check if the file has changed. This is currently
done only for the standard keystore/truststore configs. With KIP-519, I
guess we want the custom SslEngineFactory to be able to decide whether
reconfiguration is required. The simplest way to achieve this would be to
have a custom config that is updated when reconfiguration is required. I am
not sure we need a separate mechanism to trigger reconfiguration that
doesn't rely on admin clients since admin clients provide useful logging
and auditability.

Regards,

Rajini

On Wed, Sep 11, 2019 at 4:13 PM Pellerin, Clement 
wrote:

> I'm sorry if I divert the discussion, but without this issue, it would
> have been pretty trivial to update KIP-383 to go as far as you did. I am
> also happy to get a discussion going, the KIP-383 thread was a desolate
> place.
>
> Kafka needs to know about custom configs because it validates the configs
> before it passes them to (re)configure. Unknown configs are silently
> removed by ConfigDef. We could keep unknown configs as strings without
> validating them in ConfigDef, but I don't know if the Kafka community would
> accept this weaker validation.
>
> It appears we are trying to invent the notion of a meta config. Anyway, I
> think we have shown asking an instance of SslEngineFactory to contribute to
> ConfigDef is way too late.
>
> For your push notification, would it be simpler to just let your
> SslEngineFactory notice the change and make it effective the next time it
> is called. SslFactory would not cache the SslEngine and always ask
> SslEngineFactory for it. You don't even need an inner thread if
> SslEngineFactory checks for a change when it is called. SslEngineFactory
> would no longer be immutable, so maybe it is worth reconsidering how
> reconfigure works for it.
>
> -Original Message-
> From: Maulin Vasavada [mailto:maulin.vasav...@gmail.com]
> Sent: Wednesday, September 11, 2019 3:29 AM
> To: dev@kafka.apache.org
> Subject: Re: [DISCUSS] KIP-519: Make SSL context/engine configuration
> extensible
>
> Hi all,
>
> Since the "custom config" seems the main topic of interest let us talk
> about it.
>
> 1. I want to confirm that I interpret the definition of 'custom config of
> SslEngineFactory' the same way Clement is suggesting - "a config that does
> not exist in Kafka but is specified by a custom implementation of
> SslEngineFactory".  If there is a disagreement to that we have to bring it
> up here sooner.
>
> 2. I've been thinking about it and I question why we are trying 

[jira] [Resolved] (KAFKA-8856) Add Streams Config for Backward-compatible Metrics

2019-09-11 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8856?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna resolved KAFKA-8856.
--
Resolution: Fixed

> Add Streams Config for Backward-compatible Metrics
> --
>
> Key: KAFKA-8856
> URL: https://issues.apache.org/jira/browse/KAFKA-8856
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
>
> With KIP-444 the tag names and names of streams metrics change. To allow 
> users having a grace period of changing their corresponding monitoring / 
> alerting eco-systems, a config shall be added that specifies which version of 
> the metrics names will be exposed.
> The definition of the new config is:
> name: built.in.metrics.version
> type: Enum
> values: {"0.10.0", "0.10.1", ... "2.3", "2.4"}
> default: "2.4" 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Created] (KAFKA-8899) Optimize Partition.maybeIncrementLeaderHW

2019-09-11 Thread Lucas Bradstreet (Jira)
Lucas Bradstreet created KAFKA-8899:
---

 Summary: Optimize Partition.maybeIncrementLeaderHW
 Key: KAFKA-8899
 URL: https://issues.apache.org/jira/browse/KAFKA-8899
 Project: Kafka
  Issue Type: Task
  Components: core
Affects Versions: 2.2.1, 2.3.0
Reporter: Lucas Bradstreet


Partition.maybeIncrementLeaderHW is in the hot path for 
ReplicaManager.updateFollowerFetchState. When replicating between brokers with 
high partition counts, maybeIncrementLeaderHW becomes expensive, with much of 
the time going to calling Partition.remoteReplicas which performs a toSet 
conversion. maybeIncrementLeaderHW should avoid generating any intermediate 
collections when calculating the new HWM.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] KIP-521: Enable redirection of Connect's log4j messages to a file by default

2019-09-11 Thread Gwen Shapira
Great idea. It will greatly improve the ops experience. Can't believe
we didn't do it before.

On Wed, Sep 11, 2019 at 2:07 PM Konstantine Karantasis
 wrote:
>
> *** Missed the [DISCUSS] tag in the previous email. Reposting here, please
> reply in this thread instead ***
>
> Hi all.
>
> While we are in the midst of some very interesting KIP discussions, I'd
> like to bring a brief and useful KIP on the table as well.
>
> It's about enabling redirection of log4j logging to a file for Kafka
> Connect by default, in a way similar to how this is done for Kafka brokers
> today.
>
> You might find it short and straightforward but, still, needs to be
> discussed as a KIP since it's an externally visible (yet compatible) change
> in how Connect logs its status during runtime.
>
> Here's a link to the KIP:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-521%3A+Enable+redirection+of+Connect%27s+log4j+messages+to+a+file+by+default
>
> Looking forward to your comments!
> Konstantine


[DISCUSS] Streams-Broker compatibility regression in 2.2.1 release

2019-09-11 Thread Matthias J. Sax
Hi,

recently a user reported an issue upgrading a Kafka Streams application
from 2.2.0 to 2.2.1 (cf
https://mail-archives.apache.org/mod_mbox/kafka-users/201908.mbox/)

After some investigation, we identified
https://issues.apache.org/jira/browse/KAFKA-7895 to be the root cause of
the problem.

The fix for KAFKA-7895 is using message headers and thus requires broker
version 0.11.0 (or newer) and message format 0.11 (or newer). Hence,
while a Kafka Streams application version 2.2.0 is compatible to older
brokers (0.10.1 and 0.10.2) and only requires message format 0.10, the
backward compatibility was broken accidentally in 2.2.1.

The fix is also contained in 2.3.0 release and cherry-picked to 2.1
branch (2.1.2 is not released yet, and thus 2.1 users are not affected
as this point).

Note: only users that use `suppress()` operator in their program are
affected.

We should not break streams-broker backward compatibility in bug-fix
releases at all and avoid for minor releases. However, it seems
necessary to have the fix in 2.3.0 though -- otherwise, `suppress()` is
effectively useless and it does not seem to be a good idea to fix the
bug only in the next major release. Hence, trading-off some backward
compatibility in a minor release seems to be acceptable for this case,
considering that 0.11.0 was release 2 years ago.

For 2.2.1, it is more challenging to decide how to move forward, because
we should not have broken streams-broker compatibility but 2.2.1 is
already released and we can only react after the fact.

  From my point of view, the best way is to keep the fix and update the
release notes and documentation accordingly. The main reason for my
suggestions is that we would expect a majority of users to be on 0.11.0
brokers already and the fix will work for them -- reverting the fix in
2.2.2 seems to be worse for all those users on newer broker versions. We
also know that `suppress()` is a highly demanded feature and a lot of
people waited for a fix.

  The expected minority of users that are on 0.10.1 / 0.10.2 brokers, or
newer brokers but still on message format 0.10 would either need to stay
on Kafka Streams 2.2.0 or upgrade their brokers/message format
accordingly. However, upgrading brokers/message format is de-facto
already required for 2.2.1 and thus keeping the fix would not make the
situation worse.

For 2.1, I would suggest to revert the fix to make sure we don't break
streams-broker compatibility for 2.1.x users. If those users need the
fix for `suppress()` they need to upgrade to 2.2.1/2.3.0 or newer and
make sure their brokers are on 0.11.0 with message format 0.11, too.


TL;DR; the proposal is:

(1) revert the fix for KAFKA-7895 in 2.1 branch
(2) keep the fix for KAFKA-7895 in 2.2.1 and 2.3.0

Impact:

 - Kafka Streams 2.1.x and 2.2.0 applications are backward compatible
back to 0.10.1 brokers, requiring message format 0.10
 - Kafka Streams 2.2.2 / 2.3.0 application (or newer) are backward
compatible back to 0.11.0 brokers, requiring message format 0.11


Please let us know what you think about this proposal.


-Matthias





signature.asc
Description: OpenPGP digital signature


[DISCUSS] KIP-521: Enable redirection of Connect's log4j messages to a file by default

2019-09-11 Thread Konstantine Karantasis
*** Missed the [DISCUSS] tag in the previous email. Reposting here, please
reply in this thread instead ***

Hi all.

While we are in the midst of some very interesting KIP discussions, I'd
like to bring a brief and useful KIP on the table as well.

It's about enabling redirection of log4j logging to a file for Kafka
Connect by default, in a way similar to how this is done for Kafka brokers
today.

You might find it short and straightforward but, still, needs to be
discussed as a KIP since it's an externally visible (yet compatible) change
in how Connect logs its status during runtime.

Here's a link to the KIP:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-521%3A+Enable+redirection+of+Connect%27s+log4j+messages+to+a+file+by+default

Looking forward to your comments!
Konstantine


Re: [DISCUSS] KIP-521: Enable redirection of Connect's log4j messages to a file by default

2019-09-11 Thread Konstantine Karantasis
Thanks Gwen!
Indeed, it's a common setup and it's been missing for some time. I agree,
it'll be nice to have this in place by default.
I'm guessing previous attempts missed that such a change needs a KIP.

Cheers,
Konstantine



On Wed, Sep 11, 2019 at 2:16 PM Gwen Shapira  wrote:

> Great idea. It will greatly improve the ops experience. Can't believe
> we didn't do it before.
>
> On Wed, Sep 11, 2019 at 2:07 PM Konstantine Karantasis
>  wrote:
> >
> > *** Missed the [DISCUSS] tag in the previous email. Reposting here,
> please
> > reply in this thread instead ***
> >
> > Hi all.
> >
> > While we are in the midst of some very interesting KIP discussions, I'd
> > like to bring a brief and useful KIP on the table as well.
> >
> > It's about enabling redirection of log4j logging to a file for Kafka
> > Connect by default, in a way similar to how this is done for Kafka
> brokers
> > today.
> >
> > You might find it short and straightforward but, still, needs to be
> > discussed as a KIP since it's an externally visible (yet compatible)
> change
> > in how Connect logs its status during runtime.
> >
> > Here's a link to the KIP:
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-521%3A+Enable+redirection+of+Connect%27s+log4j+messages+to+a+file+by+default
> >
> > Looking forward to your comments!
> > Konstantine
>


Build failed in Jenkins: kafka-trunk-jdk8 #3900

2019-09-11 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-8886; Make Authorizer create/delete API asynchronous (#7316)

[wangguoz] KAFKA-8856: Add Streams config for backward-compatible metrics 
(#7279)

--
[...truncated 6.04 MB...]

org.apache.kafka.connect.json.JsonConverterTest > 
testConnectSchemaMetadataTranslation PASSED

org.apache.kafka.connect.json.JsonConverterTest > shortToJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > shortToJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > dateToConnect STARTED

org.apache.kafka.connect.json.JsonConverterTest > dateToConnect PASSED

org.apache.kafka.connect.json.JsonConverterTest > doubleToJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > doubleToJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > testStringHeaderToJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > testStringHeaderToJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > decimalToConnectOptional 
STARTED

org.apache.kafka.connect.json.JsonConverterTest > decimalToConnectOptional 
PASSED

org.apache.kafka.connect.json.JsonConverterTest > structSchemaIdentical STARTED

org.apache.kafka.connect.json.JsonConverterTest > structSchemaIdentical PASSED

org.apache.kafka.connect.json.JsonConverterTest > timeToConnectWithDefaultValue 
STARTED

org.apache.kafka.connect.json.JsonConverterTest > timeToConnectWithDefaultValue 
PASSED

org.apache.kafka.connect.json.JsonConverterTest > timeToConnect STARTED

org.apache.kafka.connect.json.JsonConverterTest > timeToConnect PASSED

org.apache.kafka.connect.json.JsonConverterTest > 
dateToConnectOptionalWithDefaultValue STARTED

org.apache.kafka.connect.json.JsonConverterTest > 
dateToConnectOptionalWithDefaultValue PASSED

org.apache.kafka.connect.json.JsonConverterTest > mapToConnectStringKeys STARTED

org.apache.kafka.connect.json.JsonConverterTest > mapToConnectStringKeys PASSED

org.apache.kafka.connect.json.JsonConverterTest > dateToConnectOptional STARTED

org.apache.kafka.connect.json.JsonConverterTest > dateToConnectOptional PASSED

org.apache.kafka.connect.json.JsonConverterTest > 
timeToConnectOptionalWithDefaultValue STARTED

org.apache.kafka.connect.json.JsonConverterTest > 
timeToConnectOptionalWithDefaultValue PASSED

org.apache.kafka.connect.json.JsonConverterTest > floatToConnect STARTED

org.apache.kafka.connect.json.JsonConverterTest > floatToConnect PASSED

org.apache.kafka.connect.json.JsonConverterTest > 
decimalToConnectWithDefaultValue STARTED

org.apache.kafka.connect.json.JsonConverterTest > 
decimalToConnectWithDefaultValue PASSED

org.apache.kafka.connect.json.JsonConverterTest > decimalToJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > decimalToJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > arrayToConnect STARTED

org.apache.kafka.connect.json.JsonConverterTest > arrayToConnect PASSED

org.apache.kafka.connect.json.JsonConverterTest > 
testCacheSchemaToConnectConversion STARTED

org.apache.kafka.connect.json.JsonConverterTest > 
testCacheSchemaToConnectConversion PASSED

org.apache.kafka.connect.json.JsonConverterTest > booleanToJson STARTED

org.apache.kafka.connect.json.JsonConverterTest > booleanToJson PASSED

org.apache.kafka.connect.json.JsonConverterTest > bytesToConnect STARTED

org.apache.kafka.connect.json.JsonConverterTest > bytesToConnect PASSED

org.apache.kafka.connect.json.JsonConverterTest > doubleToConnect STARTED

org.apache.kafka.connect.json.JsonConverterTest > doubleToConnect PASSED

org.apache.kafka.connect.json.JsonConverterTest > 
timestampToConnectOptionalWithDefaultValue STARTED

org.apache.kafka.connect.json.JsonConverterTest > 
timestampToConnectOptionalWithDefaultValue PASSED

> Task :connect:file:test

org.apache.kafka.connect.file.FileStreamSinkConnectorTest > testSinkTasks 
STARTED

org.apache.kafka.connect.file.FileStreamSinkConnectorTest > testSinkTasks PASSED

org.apache.kafka.connect.file.FileStreamSinkConnectorTest > testTaskClass 
STARTED

org.apache.kafka.connect.file.FileStreamSinkConnectorTest > testTaskClass PASSED

org.apache.kafka.connect.file.FileStreamSinkConnectorTest > 
testConnectorConfigValidation STARTED

org.apache.kafka.connect.file.FileStreamSinkConnectorTest > 
testConnectorConfigValidation PASSED

org.apache.kafka.connect.file.FileStreamSinkConnectorTest > testSinkTasksStdout 
STARTED

org.apache.kafka.connect.file.FileStreamSinkConnectorTest > testSinkTasksStdout 
PASSED

org.apache.kafka.connect.file.FileStreamSinkTaskTest > testPutFlush STARTED

org.apache.kafka.connect.file.FileStreamSinkTaskTest > testPutFlush PASSED

org.apache.kafka.connect.file.FileStreamSinkTaskTest > testStart STARTED

org.apache.kafka.connect.file.FileStreamSinkTaskTest > testStart PASSED

org.apache.kafka.connect.file.FileStreamSourceTaskTest > testNormalLifecycle 
STARTED


[jira] [Created] (KAFKA-8900) Stalled partitions

2019-09-11 Thread Luke Stephenson (Jira)
Luke Stephenson created KAFKA-8900:
--

 Summary: Stalled partitions
 Key: KAFKA-8900
 URL: https://issues.apache.org/jira/browse/KAFKA-8900
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.1.1
Reporter: Luke Stephenson


I'm seeing behaviour where a Scala KafkaConsumer has stalled for 1 partition 
for a topic.  All other partitions for that topic are successfully being 
consumed.

Restarting the consumer process does not resolve the issue.  The consumer is 
using version 2.3.0 ("org.apache.kafka" % "kafka-clients" % "2.3.0").

When the consumer starts, I see that it is assigned the partition.  However it 
then logs:
{code}
[Consumer 
clientId=kafka-bus-router-64c88855cf-hxck7.event-bus-router-consumer.1d1ed7ee-5038-4441-84eb-8080ac130e9a,
 groupId=event-bus-router] Setting offset for partition maxwell.transactions-22 
to the committed offset FetchPosition{offset=275413397, 
offsetEpoch=Optional[271], currentLeader=LeaderAndEpoch{leader=:-1 (id: -1 
rack: null), epoch=271}}
{code}

Note that the leader is logged as "-1".  If I search through my application 
logs for the past couple of days, the only time I ever see this logged on the 
consumer is for this partition.

The kafka broker is running version 2.1.1.  On the broker side the logs show:
{code}
{"timeMillis":1568087844876,"thread":"kafka-request-handler-1","level":"WARN","loggerName":"state.change.logger","message":"[Broker
 id=5] Ignoring LeaderAndIsr request from controller 4 with correlation id 
15943 epoch 155 for partition maxwell.transactions-22 since its associated 
leader epoch 270 is not higher than the current leader epoch 
270","endOfBatch":false,"loggerFqcn":"org.slf4j.impl.Log4jLoggerAdapter","threadId":72,"threadPriority":5}
{"timeMillis":1568087844880,"thread":"kafka-request-handler-1","level":"INFO","loggerName":"kafka.server.ReplicaFetcherManager","message":"[ReplicaFetcherManager
 on broker 5] Removed fetcher for partitions 
Set(maxwell.transactions-22)","endOfBatch":false,"loggerFqcn":"org.slf4j.impl.Log4jLoggerAdapter","threadId":72,"threadPriority":5}
{"timeMillis":1568087844880,"thread":"kafka-request-handler-1","level":"INFO","loggerName":"kafka.cluster.Partition","message":"[Partition
 maxwell.transactions-22 broker=5] maxwell.transactions-22 starts at Leader 
Epoch 271 from offset 275403423. Previous Leader Epoch was: 
270","endOfBatch":false,"loggerFqcn":"org.slf4j.impl.Log4jLoggerAdapter","threadId":72,"threadPriority":5}
{"timeMillis":1568087844891,"thread":"kafka-request-handler-1","level":"INFO","loggerName":"state.change.logger","message":"[Broker
 id=5] Skipped the become-leader state change after marking its partition as 
leader with correlation id 15945 from controller 4 epoch 155 for partition 
maxwell.transactions-22 (last update controller epoch 155) since it is already 
the leader for the 
partition.","endOfBatch":false,"loggerFqcn":"org.slf4j.impl.Log4jLoggerAdapter","threadId":72,"threadPriority":5}
{code}

As soon as I restart the broker which is the leader for that partition, the 
messages flow through to the consumer.

Given restarts of the consumer don't help, but restarting the broker allows the 
stalled partition to resume, I'm inclined to think this is an issue with the 
broker.  Please let me know if I can assist further with investigating or 
resolving this.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Jenkins build is back to normal : kafka-2.3-jdk8 #103

2019-09-11 Thread Apache Jenkins Server
See 




Re: [DISCUSS] Streams-Broker compatibility regression in 2.2.1 release

2019-09-11 Thread Alisson Sales
Thanks for letting the community/users know.

The proposal seems sensible.

I'm wondering if is it worth to add a note about this on the release notes
here: https://www.apache.org/dist/kafka/2.2.1/RELEASE_NOTES.html.

On Thu, Sep 12, 2019 at 5:23 AM Matthias J. Sax 
wrote:

> Hi,
>
> recently a user reported an issue upgrading a Kafka Streams application
> from 2.2.0 to 2.2.1 (cf
> https://mail-archives.apache.org/mod_mbox/kafka-users/201908.mbox/
> )
>
> After some investigation, we identified
> https://issues.apache.org/jira/browse/KAFKA-7895 to be the root cause of
> the problem.
>
> The fix for KAFKA-7895 is using message headers and thus requires broker
> version 0.11.0 (or newer) and message format 0.11 (or newer). Hence,
> while a Kafka Streams application version 2.2.0 is compatible to older
> brokers (0.10.1 and 0.10.2) and only requires message format 0.10, the
> backward compatibility was broken accidentally in 2.2.1.
>
> The fix is also contained in 2.3.0 release and cherry-picked to 2.1
> branch (2.1.2 is not released yet, and thus 2.1 users are not affected
> as this point).
>
> Note: only users that use `suppress()` operator in their program are
> affected.
>
> We should not break streams-broker backward compatibility in bug-fix
> releases at all and avoid for minor releases. However, it seems
> necessary to have the fix in 2.3.0 though -- otherwise, `suppress()` is
> effectively useless and it does not seem to be a good idea to fix the
> bug only in the next major release. Hence, trading-off some backward
> compatibility in a minor release seems to be acceptable for this case,
> considering that 0.11.0 was release 2 years ago.
>
> For 2.2.1, it is more challenging to decide how to move forward, because
> we should not have broken streams-broker compatibility but 2.2.1 is
> already released and we can only react after the fact.
>
>   From my point of view, the best way is to keep the fix and update the
> release notes and documentation accordingly. The main reason for my
> suggestions is that we would expect a majority of users to be on 0.11.0
> brokers already and the fix will work for them -- reverting the fix in
> 2.2.2 seems to be worse for all those users on newer broker versions. We
> also know that `suppress()` is a highly demanded feature and a lot of
> people waited for a fix.
>
>   The expected minority of users that are on 0.10.1 / 0.10.2 brokers, or
> newer brokers but still on message format 0.10 would either need to stay
> on Kafka Streams 2.2.0 or upgrade their brokers/message format
> accordingly. However, upgrading brokers/message format is de-facto
> already required for 2.2.1 and thus keeping the fix would not make the
> situation worse.
>
> For 2.1, I would suggest to revert the fix to make sure we don't break
> streams-broker compatibility for 2.1.x users. If those users need the
> fix for `suppress()` they need to upgrade to 2.2.1/2.3.0 or newer and
> make sure their brokers are on 0.11.0 with message format 0.11, too.
>
>
> TL;DR; the proposal is:
>
> (1) revert the fix for KAFKA-7895 in 2.1 branch
> (2) keep the fix for KAFKA-7895 in 2.2.1 and 2.3.0
>
> Impact:
>
>  - Kafka Streams 2.1.x and 2.2.0 applications are backward compatible
> back to 0.10.1 brokers, requiring message format 0.10
>  - Kafka Streams 2.2.2 / 2.3.0 application (or newer) are backward
> compatible back to 0.11.0 brokers, requiring message format 0.11
>
>
> Please let us know what you think about this proposal.
>
>
> -Matthias
>
>
>
>


Build failed in Jenkins: kafka-trunk-jdk11 #809

2019-09-11 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-8886; Make Authorizer create/delete API asynchronous (#7316)

--
[...truncated 1.90 MB...]
at 
app//org.apache.directory.server.core.changelog.ChangeLogInterceptor.add(ChangeLogInterceptor.java:113)
at 
app//org.apache.directory.server.core.api.interceptor.BaseInterceptor.next(BaseInterceptor.java:341)
at 
app//org.apache.directory.server.core.trigger.TriggerInterceptor.add(TriggerInterceptor.java:297)
at 
app//org.apache.directory.server.core.api.interceptor.BaseInterceptor.next(BaseInterceptor.java:341)
at 
app//org.apache.directory.server.core.event.EventInterceptor.add(EventInterceptor.java:245)
at 
app//org.apache.directory.server.core.api.interceptor.BaseInterceptor.next(BaseInterceptor.java:341)
at 
app//org.apache.directory.server.core.subtree.SubentryInterceptor.add(SubentryInterceptor.java:1018)
at 
app//org.apache.directory.server.core.api.interceptor.BaseInterceptor.next(BaseInterceptor.java:341)
at 
app//org.apache.directory.server.core.collective.CollectiveAttributeInterceptor.add(CollectiveAttributeInterceptor.java:134)
at 
app//org.apache.directory.server.core.api.interceptor.BaseInterceptor.next(BaseInterceptor.java:341)
at 
app//org.apache.directory.server.core.operational.OperationalAttributeInterceptor.add(OperationalAttributeInterceptor.java:310)
at 
app//org.apache.directory.server.core.api.interceptor.BaseInterceptor.next(BaseInterceptor.java:341)
at 
app//org.apache.directory.server.core.schema.SchemaInterceptor.add(SchemaInterceptor.java:1109)
at 
app//org.apache.directory.server.core.api.interceptor.BaseInterceptor.next(BaseInterceptor.java:341)
at 
app//org.apache.directory.server.core.exception.ExceptionInterceptor.add(ExceptionInterceptor.java:134)
at 
app//org.apache.directory.server.core.api.interceptor.BaseInterceptor.next(BaseInterceptor.java:341)
at 
app//org.apache.directory.server.core.admin.AdministrativePointInterceptor.add(AdministrativePointInterceptor.java:1189)
at 
app//org.apache.directory.server.core.api.interceptor.BaseInterceptor.next(BaseInterceptor.java:341)
at 
app//org.apache.directory.server.core.authz.AciAuthorizationInterceptor.add(AciAuthorizationInterceptor.java:518)
at 
app//org.apache.directory.server.core.api.interceptor.BaseInterceptor.next(BaseInterceptor.java:341)
at 
app//org.apache.directory.server.core.referral.ReferralInterceptor.add(ReferralInterceptor.java:247)
at 
app//org.apache.directory.server.core.api.interceptor.BaseInterceptor.next(BaseInterceptor.java:341)
at 
app//org.apache.directory.server.core.authn.AuthenticationInterceptor.add(AuthenticationInterceptor.java:335)
at 
app//org.apache.directory.server.core.api.interceptor.BaseInterceptor.next(BaseInterceptor.java:341)
at 
app//org.apache.directory.server.core.normalization.NormalizationInterceptor.add(NormalizationInterceptor.java:131)
at 
app//org.apache.directory.server.core.DefaultOperationManager.add(DefaultOperationManager.java:397)
at 
app//org.apache.directory.server.core.shared.DefaultCoreSession.add(DefaultCoreSession.java:211)
at 
app//org.apache.directory.server.core.shared.DefaultCoreSession.add(DefaultCoreSession.java:188)
at 
app//kafka.security.minikdc.MiniKdc.initDirectoryService(MiniKdc.scala:187)
at app//kafka.security.minikdc.MiniKdc.start(MiniKdc.scala:121)
at app//kafka.api.SaslSetup.initializeKerberos(SaslSetup.scala:78)
at app//kafka.api.SaslSetup.initializeKerberos$(SaslSetup.scala:75)
at 
app//kafka.api.SaslSslAdminClientIntegrationTest.initializeKerberos(SaslSslAdminClientIntegrationTest.scala:34)
at app//kafka.api.SaslSetup.startSasl(SaslSetup.scala:64)
at app//kafka.api.SaslSetup.startSasl$(SaslSetup.scala:56)
at 
app//kafka.api.SaslSslAdminClientIntegrationTest.startSasl(SaslSslAdminClientIntegrationTest.scala:34)
at 
app//kafka.api.SaslSslAdminClientIntegrationTest.setUpSasl(SaslSslAdminClientIntegrationTest.scala:69)
at 
app//kafka.api.SaslSslAdminClientIntegrationTest.setUp(SaslSslAdminClientIntegrationTest.scala:64)

kafka.api.SaslSslAdminClientIntegrationTest > 
testElectUncleanLeadersWhenNoLiveBrokers STARTED
kafka.api.SaslSslAdminClientIntegrationTest.testElectUncleanLeadersWhenNoLiveBrokers
 failed, log available in 


kafka.api.SaslSslAdminClientIntegrationTest > 
testElectUncleanLeadersWhenNoLiveBrokers FAILED
org.junit.runners.model.TestTimedOutException: test timed out after 12 
milliseconds
at 

Re: [DISCUSS] KIP-507: Securing Internal Connect REST Endpoints

2019-09-11 Thread Chris Egerton
Hi all,

I've updated KIP-507 to reflect the changes inspired by Randall's recent
feedback. In addition, after some further research, I've decided to remove
the proposed default value for the internal.request.key.size and instead,
should no value be provided, rely on the default key size for the given key
algorithm. More detail can be found in the KIP if anyone's interested.

Cheers,

Chris

On Tue, Sep 10, 2019 at 3:18 PM Chris Egerton  wrote:

> Hi Randall,
>
> Thanks so much for your comprehensive feedback! To avoid creating an
> extremely long response I'll address your comments/questions by referencing
> the identifiers you've provided for them as opposed to copying them and
> responding inline; hopefully things are still readable :)
>
>
> RE new configs:
>
> a) I believe exposing these configurations right now is the correct
> approach. There are two scenarios that we should aim to support: running
> Connect on a bare-bones JVM that doesn't implement anything beyond the
> requirements of the JVM specs in terms of key generation and key signing
> algorithms, and running Connect in an environment with hard security
> requirements for, e.g., FIPS compliance. The default values for these
> configurations are intended to satisfy the first use case; however, in
> order to enable users to conform with higher security standards (the second
> use case), some key generation and key signing algorithms that are not
> guaranteed to be present on every implementation of the JVM may be
> required. I don't see a way to satisfy both use cases without adding
> configurability.
> With regards to key size: yes, this needs to be configurable as there are
> different ranges of acceptable key size for different key generation
> algorithms; for example, the "AES" algorithm for key generation requires a
> key size of either 128, 192 or 256 (at least on my local JVM) whereas the
> "DES" algorithm requires a key size of exactly 56.
> As far as enabling different algorithms for key generation vs. request
> signing goes... I'm not sure. Local tests involving keys generated with
> different algorithms from the mac algorithms used to sign inputs (e.g.,
> combining an HmacSHA256 key with an HmacMD5 mac or using a DES key with an
> HmacSHA256 mac) haven't thrown any exceptions yet but this is a little
> beyond the boundaries of my current knowledge, so I'll have to get back to
> you on that point. At the very least, your question (and the research I've
> done so far to try to address it) has highlighted a bug in my current PR
> that'll definitely need to be fixed :)
>
> b) The riskiest scenario is if a follower worker is configured to use a
> request signing algorithm that isn't allowed by the leader. In this case,
> any failure will only occur if/when that follower starts up a connector and
> then has to forward tasks for that connector to the leader, which may not
> happen immediately. Once that failure occurs, an endless failure loop will
> occur wherein the follower endlessly retries to send those task
> configurations to the leader and pauses by the backoff interval in between
> each failed request.
> There are two ways to rectify this situation; either shut down the
> follower and restart it after editing its configuration to use a request
> signing algorithm permitted by the leader, or shut down all other workers
> in the cluster that do not permit the request signing algorithm used by the
> follower, reconfigure them to permit it, and then restart them.
> This scenario is unlikely to occur with any normal usage of Connect, but
> the circumstances leading to it will need to be called out very carefully
> in order to avoid putting the cluster into a bad state and flooding logs
> with scary-looking error messages. Speaking of, it'll be vital to design
> appropriate error messages for this scenario so that users can (hopefully)
> dig themselves out of that hole on their own.
> Differing values for any of the other configs shouldn't actually be too
> problematic, given that the three remaining configs all deal with key
> generation/rotation, which is only handled by one worker at a time (the
> leader).
>
> c) Good call. Yes, these configs are all "low" importance and I'll update
> the KIP accordingly to reflect that.
>
> d) No, there is no window when multiple keys are valid. This is explicitly
> called out in the KIP at the end of the "Proposed Changes" section:
> > The leader will only accept requests signed with the most current key.
> This should not cause any major problems; if a follower attempts to make a
> request with an expired key (which should be quite rare and only occur if
> the request is made by a follower that is not fully caught up to the end of
> the config topic), the initial request will fail, but will be subsequently
> retried after a backoff period.
>
> e) I'll update the KIP to reflect the fact that, barring any need for
> heightened levels of security compliance, none of these configurations
> should 

Re: [VOTE] KIP-520: Augment Consumer.committed(partition) to allow multiple partitions

2019-09-11 Thread Kamal Chandraprakash
Thanks for the KIP!

LGTM, +1 (non-binding).

On Wed, Sep 11, 2019 at 3:23 AM Matthias J. Sax 
wrote:

> I don't have a strong preference. So I am also fine to deprecate the
> existing methods. Let's see what Jason thinks.
>
> Can you update the KIP to reflect the semantics of the return `Map` (ie,
> does only contain entries for partitions with committed offsets, and
> does not contain `null` values)?
>
>
> +1 (binding)
>
> -Matthias
>
>
>
>
> On 9/10/19 11:53 AM, Guozhang Wang wrote:
> > Hi Jason / Matthias,
> >
> > I understand your concerns now. Just to clarify my main motivation on
> > deprecating the old APIs is not only for aesthetics (confess I personally
> > do have preference on succinct APIs), but to urge people to use the
> batched
> > API for better latency when possible --- as stated in the KIP, my
> > observation is that in practice most callers are actually going to get
> > committed offsets for more than one partitions, and without deprecating
> the
> > old APIs it would be hard for them to realize that the new API does have
> a
> > benefit in performance.
> >
> > This is different from some of the existing APIs though -- e.g., Matthias
> > mentioned about seek / seekToBeginning / seekToEnd, where only seekToXX
> > have plurals and seek only have singulars. We can, of course, make
> seekToXX
> > with plurals as well just like commitSync/Async, but since seeks are
> > non-blocking APIs (they rely on the follow-up polling call to talk to the
> > brokers) either calling it multiple times with one partition each v.s.
> > calling it one time with a plural makes little difference (still, I'd
> argue
> > that today we do not have a same-named function overloaded with both
> types
> > :P) On the other hand, committed, commitSync, offsetsForTimes etc
> blocking
> > calls are all in the form of plurals except
> >
> > * committed
> > * position
> > * partitionsFor
> >
> > My rationale was that 1) for consecutive calls of #position, mostly it
> > would only require a single round-trip to brokers since we are trying to
> > refresh fetching positions for all partitions anyways, and 2) for
> > #partitionsFor, theoretically we could also consider to ask for multiple
> > topics in one call since each blocking call potentially incurs one round
> > trip, but I did not include it in the scope of this KIP only because I
> have
> > not observed too many usage patterns that are commonly calling it
> > consecutively for multiple topics. At the moment, what I truly want to
> > "improve" on is the committed calls, as in many cases I've seen it being
> > called consecutively for multiple topic-partitions.
> >
> > Therefore, I'm still more inclined to deprecate the old APIs so that we
> can
> > enforce people to discover the new batching APIs for efficiency in this
> > KIP. But if you feel that this compatibility is very crucial to maintain
> I
> > could be convinced.
> >
> >
> > Guozhang
> >
> > On Tue, Sep 10, 2019 at 10:18 AM Matthias J. Sax 
> > wrote:
> >
> >> Thanks for the KIP Guozhang.
> >>
> >>> Another reason is that other functions of KafkaConsumers do not have
> >> those
> >>> overloaded functions to be consistent
> >>
> >> I tend to agree with Jason about keeping the existing methods. Your
> >> argument does not seem to hold. I just checked the `Consumer` API, and
> >> it's mix of overloads:
> >>
> >> Methods only talking `Collections`
> >>
> >>
> >>
> subscribe/assign/commitSync/commitAsyn/pause/resume/offsetsForTimes/beginningOffsets/endOffsets
> >>
> >> Method with overload taking `Collections` or as single value:
> >>
> >> seek/seekToBeginning/seekToEnd
> >>
> >> (those are strictly different methods, but they are semantically
> related)
> >>
> >> Only talking single value:
> >>
> >> position/committed/partitionsFor
> >>
> >>
> >> While you are strictly speaking correct, that there is no method with an
> >> overload for `Collection` and single value, the API mix seems to suggest
> >> that it might actually be worth to have corresponding overloads for all
> >> methods instead of sticking to `Collections` only.
> >>
> >>
> >>
> >> About the return map: I agree that not containing any entry in the map
> >> is better. It's not only consistent with other APIs but it also avoids
> >> potential NPEs.
> >>
> >>
> >>
> >> -Matthias
> >>
> >>
> >> On 9/10/19 10:04 AM, Jason Gustafson wrote:
>   I feel it not worth making committed to have both plurals and
> >> singulars.
> >>>
> >>> Not sure I agree. If we had started with these new APIs from the
> >> beginning,
> >>> that may have been better, but we already have exposed the singular
> APIs
> >>> and users are depending on them. Not sure it's worth breaking
> >> compatibility
> >>> just for aesthetics.
> >>>
> >>> -Jason
> >>>
> >>> On Tue, Sep 10, 2019 at 9:41 AM Guozhang Wang 
> >> wrote:
> >>>
>  Thanks Jason!
> 
>  On Tue, Sep 10, 2019 at 9:07 AM Jason Gustafson 
>  wrote:
> 
> > Hi Guozhang,
> >
> > I think the 

[jira] [Created] (KAFKA-8897) Increase Version of RocksDB

2019-09-11 Thread Bruno Cadonna (Jira)
Bruno Cadonna created KAFKA-8897:


 Summary: Increase Version of RocksDB
 Key: KAFKA-8897
 URL: https://issues.apache.org/jira/browse/KAFKA-8897
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Bruno Cadonna


A higher version (6+) of RocksDB is needed for some metrics specified in 
KIP-471. 



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Re: [DISCUSS] KIP-519: Make SSL context/engine configuration extensible

2019-09-11 Thread Maulin Vasavada
Hi all,

Since the "custom config" seems the main topic of interest let us talk
about it.

1. I want to confirm that I interpret the definition of 'custom config of
SslEngineFactory' the same way Clement is suggesting - "a config that does
not exist in Kafka but is specified by a custom implementation of
SslEngineFactory".  If there is a disagreement to that we have to bring it
up here sooner.

2. I've been thinking about it and I question why we are trying to make a
custom config a first class citizen in standard config?
The reasoning for that question is-
Kafka wants to delegate creation of SSLEngine to a class which is "not"
part of Kafka's distribution. Since the interface for SSLEngine creator
will be defined by the public method of createSSLEngine(), why would Kafka
care what does the implementation do other than fulfilling the contract of
creation of SSLEngine. The implementation can use any special configs i.e.
configs coming from input Map OR configs defined in a new file only known
to itself. Making the configs part of the interface contract in any way is
not necessary. This way we keep it simple and straightforward.

3. Now, 2nd point raises a question - if we follow that suggestion - how
can we ever re-create the SSLEngineFactory object and allow new object to
be created when something changes in the implementation. That is a valid
question. If you noticed in the KIP section titled "Other challenge" - we
do have scenario where the SslEngineFactory implementation ONLY knows that
something changed - example: keystore got updated by a local daemon process
only known to the specific implementation. This means we have a need of
"push notification" from the SslEngineFactory's implementation to the
SslFactory actually. I feel if we build the "push notification" via adding
a method in the interface as "public void
registerReconfigurableListener(Reconfigurable r)" and make SslFactory
register itself with the SslEngineFactory's impl class, we can trigger the
reconfiguration of SslEngineFactory implementation based on its own terms
and conditions without getting into making custom configs complicated.

I am just thinking out loud here and expressing my opinion not to avoid
addressing custom configs BUT what I genuinely believe might be a better
approach.

Thanks
Maulin



















On Tue, Sep 10, 2019 at 9:06 PM Pellerin, Clement 
wrote:

> Regarding what I labeled the simplest solution below, SslConfigs could
> instantiate the custom interface only if the yet to be validated configs
> were passed in to the call to get the list of known SSL configs.
>
> -Original Message-
> From: Pellerin, Clement
> Sent: Tuesday, September 10, 2019 11:36 AM
> To: dev@kafka.apache.org
> Subject: [EXTERNAL]RE: [DISCUSS] KIP-519: Make SSL context/engine
> configuration extensible
>
> Another solution could be a new standard ssl config that holds a list of
> extra custom configs to accept.
> Using a custom SslEngineFactory with custom configs would require setting
> two configs, one for the class name and another for the list of custom
> configs.
> In SslConfigs, we see that declaring a single config takes 5 values, so
> I'm not sure how it would work exactly.
>
> We could also declare a new interface to return the list of custom ssl
> configs and the extra standard ssl config I'm proposing holds the name of
> the implementation class instead. The reason a different interface is
> needed is because it would be instantiated by SslConfigs, not SslFactory.
> This seems the simplest solution.
>
> Anyway, the point of this exercise is to prove an acceptable solution for
> custom configs is not affecting the public API in KIP-519.
>
>
> -Original Message-
> From: Pellerin, Clement
> Sent: Tuesday, September 10, 2019 9:35 AM
> To: dev@kafka.apache.org
> Subject: [EXTERNAL]RE: [DISCUSS] KIP-519: Make SSL context/engine
> configuration extensible
>
> Custom config is a term I invented to mean a config that does not exist in
> Kafka but is specified by a custom implementation of SslEngineFactory.
> The problem with custom configs (as I remember it) is that the list of
> configs is static in SslConfigs and config names are checked before
> SslFactory is created.
> ==> You must solve this problem because that is what killed KIP-383 and
> therefore is the sole reason why KIP-519 exists.
> ==> Your KIP does not have to implement the solution since it can be done
> in a future KIP, but your KIP must be compatible with the solution found.
> ==> A method to return the list of configs would help. This cannot be a
> static method in the interface since it cannot be overridden.
> ==> You could require a static method in the implementation class by
> convention, just like the constructor you require.
>
> This email did not originate from inside Information Builders.
>


Build failed in Jenkins: kafka-trunk-jdk11 #807

2019-09-11 Thread Apache Jenkins Server
See 


Changes:

[jason] KAFKA-8747; Add atomic counter to fix flaky testEventQueueTime test

--
[...truncated 2.03 MB...]
kafka.server.KafkaMetricReporterClusterIdTest > testClusterIdPresent PASSED

kafka.server.ReplicaManagerTest > testPreferredReplicaAsLeader STARTED

kafka.server.ReplicaManagerTest > testPreferredReplicaAsLeader PASSED

kafka.server.ReplicaManagerTest > testReplicaSelector STARTED

kafka.server.ReplicaManagerTest > testReplicaSelector PASSED

kafka.server.ReplicaManagerTest > testHighWaterMarkDirectoryMapping STARTED

kafka.server.ReplicaManagerTest > testHighWaterMarkDirectoryMapping PASSED

kafka.server.ReplicaManagerTest > 
testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate STARTED

kafka.server.ReplicaManagerTest > 
testBecomeFollowerWhenLeaderIsUnchangedButMissedLeaderUpdate PASSED

kafka.server.ReplicaManagerTest > testFollowerStateNotUpdatedIfLogReadFails 
STARTED

kafka.server.ReplicaManagerTest > testFollowerStateNotUpdatedIfLogReadFails 
PASSED

kafka.server.ReplicaManagerTest > 
testFetchBeyondHighWatermarkReturnEmptyResponse STARTED

kafka.server.ReplicaManagerTest > 
testFetchBeyondHighWatermarkReturnEmptyResponse PASSED

kafka.server.ReplicaManagerTest > 
testFetchMessagesWhenNotFollowerForOnePartition STARTED

kafka.server.ReplicaManagerTest > 
testFetchMessagesWhenNotFollowerForOnePartition PASSED

kafka.server.ReplicaManagerTest > testIllegalRequiredAcks STARTED

kafka.server.ReplicaManagerTest > testIllegalRequiredAcks PASSED

kafka.server.ReplicaManagerTest > testClearPurgatoryOnBecomingFollower STARTED

kafka.server.ReplicaManagerTest > testClearPurgatoryOnBecomingFollower PASSED

kafka.server.ReplicaManagerTest > 
testOldLeaderLosesMetricsWhenReassignPartitions STARTED

kafka.server.ReplicaManagerTest > 
testOldLeaderLosesMetricsWhenReassignPartitions PASSED

kafka.server.ReplicaManagerTest > testHighwaterMarkRelativeDirectoryMapping 
STARTED

kafka.server.ReplicaManagerTest > testHighwaterMarkRelativeDirectoryMapping 
PASSED

kafka.server.ReplicaManagerTest > 
testOldFollowerLosesMetricsWhenReassignPartitions STARTED

kafka.server.ReplicaManagerTest > 
testOldFollowerLosesMetricsWhenReassignPartitions PASSED

kafka.server.ReplicaManagerTest > testUnknownReplicaSelector STARTED

kafka.server.ReplicaManagerTest > testUnknownReplicaSelector PASSED

kafka.server.ReplicaManagerTest > 
testReceiveOutOfOrderSequenceExceptionWithLogStartOffset STARTED

kafka.server.ReplicaManagerTest > 
testReceiveOutOfOrderSequenceExceptionWithLogStartOffset PASSED

kafka.server.ReplicaManagerTest > testPreferredReplicaAsFollower STARTED

kafka.server.ReplicaManagerTest > testPreferredReplicaAsFollower PASSED

kafka.server.ReplicaManagerTest > testDefaultReplicaSelector STARTED

kafka.server.ReplicaManagerTest > testDefaultReplicaSelector PASSED

kafka.server.ReplicaManagerTest > testReadCommittedFetchLimitedAtLSO STARTED

kafka.server.ReplicaManagerTest > testReadCommittedFetchLimitedAtLSO PASSED

kafka.server.ReplicaManagerTest > testDelayedFetchIncludesAbortedTransactions 
STARTED

kafka.server.ReplicaManagerTest > testDelayedFetchIncludesAbortedTransactions 
PASSED

kafka.server.LogDirFailureTest > testIOExceptionDuringLogRoll STARTED

kafka.server.LogDirFailureTest > testIOExceptionDuringLogRoll PASSED

kafka.server.LogDirFailureTest > testIOExceptionDuringCheckpoint STARTED

kafka.server.LogDirFailureTest > testIOExceptionDuringCheckpoint PASSED

kafka.server.LogDirFailureTest > testProduceErrorFromFailureOnCheckpoint STARTED

kafka.server.LogDirFailureTest > testProduceErrorFromFailureOnCheckpoint PASSED

kafka.server.LogDirFailureTest > 
brokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure STARTED

kafka.server.LogDirFailureTest > 
brokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure PASSED

kafka.server.LogDirFailureTest > 
testReplicaFetcherThreadAfterLogDirFailureOnFollower STARTED

kafka.server.LogDirFailureTest > 
testReplicaFetcherThreadAfterLogDirFailureOnFollower PASSED

kafka.server.LogDirFailureTest > testProduceErrorFromFailureOnLogRoll STARTED

kafka.server.LogDirFailureTest > testProduceErrorFromFailureOnLogRoll PASSED

kafka.server.BrokerEpochIntegrationTest > 
testControlRequestWithStaleBrokerEpoch STARTED

kafka.server.BrokerEpochIntegrationTest > 
testControlRequestWithStaleBrokerEpoch PASSED

kafka.server.BrokerEpochIntegrationTest > 
testControlRequestWithCorrectBrokerEpoch STARTED

kafka.server.BrokerEpochIntegrationTest > 
testControlRequestWithCorrectBrokerEpoch PASSED

kafka.server.BrokerEpochIntegrationTest > 
testReplicaManagerBrokerEpochMatchesWithZk STARTED

kafka.server.BrokerEpochIntegrationTest > 
testReplicaManagerBrokerEpochMatchesWithZk PASSED

kafka.server.BrokerEpochIntegrationTest > 
testControllerBrokerEpochCacheMatchesWithZk STARTED

kafka.server.BrokerEpochIntegrationTest > 

Jenkins build is back to normal : kafka-trunk-jdk8 #3898

2019-09-11 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-8898) if there is no message for poll, kafka consumer apply memory

2019-09-11 Thread linking12 (Jira)
linking12 created KAFKA-8898:


 Summary: if there is no message for poll, kafka consumer apply 
memory
 Key: KAFKA-8898
 URL: https://issues.apache.org/jira/browse/KAFKA-8898
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Affects Versions: 2.1.1
Reporter: linking12


when poll message, but there is no record,but consumer will apply 1000 byte 
memory;

fetched = *new* HashMap<>() is not good idea, it will apply memory in heap but 
there is no message;

I think fetched = *new* HashMap<>() will appear in records exist;

 

```

  *public* Map>> fetchedRecords() {

        Map>> fetched = *new* 
HashMap<>();

        *int* recordsRemaining = maxPollRecords;

```



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


Topology with loops (intermediate topics) and potential bug in TopologyTestDriver

2019-09-11 Thread Adam Domanski
Hi,

I probably found a bug in TopologyTestDriver for quite non-trivial Kafka
Streams topology.

The streaming logic is the following: There is a concept of children and
parents. Children are aggregated under parent. Some children of master
parent can send poison pills to other parents. Such parents die but already
aggregated kids should go then to the master. Application remembers killed
parents in a local store. If a new kid comes, the store is checked and then
kid is rerouted to the master.

I'm using intermediate topics in my topology as well as sometimes
tombstones.

Here is Gradle (Java 11 + Kotlin) based project which demonstrate the
issue: https://github.com/czterocyty/kafka_streams_test_case

You can see that few tests fail there as I expect that latest output Parent
records have the latest generation.

So seems that TopologyTestDriver does not care about the order of output
records.

Best regards,
Adam Domański


Re: [DISCUSS] KIP-511: Collect and Expose Client's Name and Version in the Brokers

2019-09-11 Thread David Jacot
Hi all,

I have discussed with Magnus about the various options to get his view from
a librdkafka perspective
and he has suggested a good alternative.

It seems we could continue with the idea to use the
ApiVersionsRequest/Response but we a different
failing back strategy. When a broker get an unknown ApiVersionsRequest, it
could continue to fail back
to version 0 as today but instead of sending back the UNSUPPORTED_VERSION
error alone in the
response, it could also provide at minimum the supported version of the
ApiVersionsRequest. This way,
a recent client could leverage that information when the error is received
and send the correct version
to the broker instead of failing all the way back to version 0.

This way, we can evolve the ApiVersionsRequest while keeping forward
compatibility of the Request
Header. It doesn't add any extra round trip.

Knowing this, I think that putting the information in the
ApiVersionsRequest remains the best option.

What do you think?

Best,
David

On Tue, Sep 10, 2019 at 1:00 AM Colin McCabe  wrote:

> Hi all,
>
> I agree that freezing the request header is not very appealing.  We might
> want to add something there later.
>
> Having a separate request type is very clean, but as David mentioned, it
> does add an extra request/response cycle to the time required to get a
> connection into a usable state.
>
> One solution to consider is adding the clientVersion and clientType to the
> request header as optional (tagged) fields.  This would let us skip the
> extra round trip.  I don't think it's that much more messy than having a
> separate request type to set the client version and type.  In both cases,
> you have to handle connections that set the version later than others, or
> don't set the version at all (for compatibility).  So the version/type has
> to be mutable and added after the TCP connection itself is established.
>
> best,
> Colin
>
>
> On Mon, Sep 9, 2019, at 06:11, David Jacot wrote:
> > Hi Gwen,
> >
> > The reasoning behind having the information before the Sasl
> authentication
> > was to have the information for troubleshooting purposes. For instance,
> when
> > there are errors in the handshake, it would be great to know if it comes
> > from
> > a specific buggy clients. I think we could live without though but was
> nice
> > to
> > have.
> >
> > Yeah. I agree with you. It seems that evolution of the RequestHeader (RH)
> > and the
> > ApiVersionsRequest (AVR) is conjunction is more complicated than I
> > anticipated.
> > The evolution if the AVR alone works well but it would make the evolution
> > of the
> > RH hard in the future. Please, check my other email to see my thoughts.
> >
> > Best,
> > David
> >
> > On Mon, Sep 9, 2019 at 3:18 AM Gwen Shapira  wrote:
> >
> > > Hey,
> > >
> > > Since modifying ApiVersionsRequest seems to be quite involved, do we
> > > want to re-examine the rejected option of adding another
> > > request/response pair? It will add another roundtrip, but Kafka
> > > already expects client connections to be long-lived, so the overhead
> > > is probably negligible.
> > >
> > > In addition, in the rejected alternatives it says: "It also would
> > > require to be done before the authentication (TLS AuthN aside) and
> > > thus requiring specific treatment, similarly to the
> > > ApiVersionsRequest." - which I don't quite understand. Why do we think
> > > this has to happen before authenticating?
> > >
> > > It sounds like addition another protocol will allow us to keep the
> > > special assumptions around ApiVersionsRequest and will remove the
> > > dependency on KIP-482. Which will make KIP-511 much simpler than the
> > > alternative we are currently discussing.
> > >
> > > Gwen
> > >
> > > On Fri, Sep 6, 2019 at 3:16 PM Colin McCabe 
> wrote:
> > > >
> > > > Hi David,
> > > >
> > > > Yeah, this is kind of difficult.
> > > >
> > > > From the high level, we either need forward compatibility (old
> brokers
> > > able to read new ApiVersionsRequests) or some kind of
> challenge/response
> > > system.  I think you're on the right track to be thinking about forward
> > > compatibility...  a challenge/response system would have a lot of
> overhead
> > > in cases where the client opens a lot of new connections.
> > > >
> > > > I agree that we can have the new versions "add stuff at the end" to
> > > maintain forward compatibility.  Tagged fields will also help here, by
> > > avoiding the need for version bumps.  We also have to think about the
> > > impact this will have on the request header.  It seems like we might
> need
> > > to freeze the version of the request header forever in order to support
> > > full forwards compatibility.  Otherwise, the old brokers will not know
> how
> > > long the new request headers are.  This is maybe OK if we can evolve
> the
> > > request header by adding tagged fields...
> > > >
> > > > Another option is to fall all the way back to version 0 when the
> broker
> > > doesn't understand the client's 

RE: [DISCUSS] KIP-519: Make SSL context/engine configuration extensible

2019-09-11 Thread Pellerin, Clement
I'm sorry if I divert the discussion, but without this issue, it would have 
been pretty trivial to update KIP-383 to go as far as you did. I am also happy 
to get a discussion going, the KIP-383 thread was a desolate place.

Kafka needs to know about custom configs because it validates the configs 
before it passes them to (re)configure. Unknown configs are silently removed by 
ConfigDef. We could keep unknown configs as strings without validating them in 
ConfigDef, but I don't know if the Kafka community would accept this weaker 
validation.

It appears we are trying to invent the notion of a meta config. Anyway, I think 
we have shown asking an instance of SslEngineFactory to contribute to ConfigDef 
is way too late.
 
For your push notification, would it be simpler to just let your 
SslEngineFactory notice the change and make it effective the next time it is 
called. SslFactory would not cache the SslEngine and always ask 
SslEngineFactory for it. You don't even need an inner thread if 
SslEngineFactory checks for a change when it is called. SslEngineFactory would 
no longer be immutable, so maybe it is worth reconsidering how reconfigure 
works for it.

-Original Message-
From: Maulin Vasavada [mailto:maulin.vasav...@gmail.com] 
Sent: Wednesday, September 11, 2019 3:29 AM
To: dev@kafka.apache.org
Subject: Re: [DISCUSS] KIP-519: Make SSL context/engine configuration extensible

Hi all,

Since the "custom config" seems the main topic of interest let us talk
about it.

1. I want to confirm that I interpret the definition of 'custom config of
SslEngineFactory' the same way Clement is suggesting - "a config that does
not exist in Kafka but is specified by a custom implementation of
SslEngineFactory".  If there is a disagreement to that we have to bring it
up here sooner.

2. I've been thinking about it and I question why we are trying to make a
custom config a first class citizen in standard config?
The reasoning for that question is-
Kafka wants to delegate creation of SSLEngine to a class which is "not"
part of Kafka's distribution. Since the interface for SSLEngine creator
will be defined by the public method of createSSLEngine(), why would Kafka
care what does the implementation do other than fulfilling the contract of
creation of SSLEngine. The implementation can use any special configs i.e.
configs coming from input Map OR configs defined in a new file only known
to itself. Making the configs part of the interface contract in any way is
not necessary. This way we keep it simple and straightforward.

3. Now, 2nd point raises a question - if we follow that suggestion - how
can we ever re-create the SSLEngineFactory object and allow new object to
be created when something changes in the implementation. That is a valid
question. If you noticed in the KIP section titled "Other challenge" - we
do have scenario where the SslEngineFactory implementation ONLY knows that
something changed - example: keystore got updated by a local daemon process
only known to the specific implementation. This means we have a need of
"push notification" from the SslEngineFactory's implementation to the
SslFactory actually. I feel if we build the "push notification" via adding
a method in the interface as "public void
registerReconfigurableListener(Reconfigurable r)" and make SslFactory
register itself with the SslEngineFactory's impl class, we can trigger the
reconfiguration of SslEngineFactory implementation based on its own terms
and conditions without getting into making custom configs complicated.

I am just thinking out loud here and expressing my opinion not to avoid
addressing custom configs BUT what I genuinely believe might be a better
approach.

Thanks
Maulin



















On Tue, Sep 10, 2019 at 9:06 PM Pellerin, Clement 
wrote:

> Regarding what I labeled the simplest solution below, SslConfigs could
> instantiate the custom interface only if the yet to be validated configs
> were passed in to the call to get the list of known SSL configs.
>
> -Original Message-
> From: Pellerin, Clement
> Sent: Tuesday, September 10, 2019 11:36 AM
> To: dev@kafka.apache.org
> Subject: [EXTERNAL]RE: [DISCUSS] KIP-519: Make SSL context/engine
> configuration extensible
>
> Another solution could be a new standard ssl config that holds a list of
> extra custom configs to accept.
> Using a custom SslEngineFactory with custom configs would require setting
> two configs, one for the class name and another for the list of custom
> configs.
> In SslConfigs, we see that declaring a single config takes 5 values, so
> I'm not sure how it would work exactly.
>
> We could also declare a new interface to return the list of custom ssl
> configs and the extra standard ssl config I'm proposing holds the name of
> the implementation class instead. The reason a different interface is
> needed is because it would be instantiated by SslConfigs, not SslFactory.
> This seems the simplest solution.
>
> Anyway, the point of this 

Re: _consumer_offsets is becoming rather big. How to purge?

2019-09-11 Thread Ash G
Bump, no reply,

It seems this condition was missed by devs when this feature was designed and 
implemented.

On 2019/09/06 14:45:47, Ash G  wrote: 
> 
> _consumer_offsets is becoming rather big > 1 TB. Is there a way to purge 
> dead/inactive consumer id rows from it?
> I am assuming dead/inactive consumer id  rows is the reason. could there be 
> another reason?
> 
> [reposting from user list]
> 


Re: Topology with loops (intermediate topics) and potential bug in TopologyTestDriver

2019-09-11 Thread John Roesler
Hi Adam,

I haven't looked into your project yet, but just wanted to mention
this to see if it explains your observations.

TopologyTestDriver processes every input record fully and
synchronously. So, when you pipe an input record in, it traverses all
internal processing, including intermediate topics, until all results
are written to sink topics (or the end of the topology is reached).

This may indeed result in a different order of output records than
you'd observe with Kafka Streams, which would instead write to the
intermediate topic in one thread and then process it in a separate
thread.

Does that add up with what you're seeing?

Thanks,
-John

On Wed, Sep 11, 2019 at 6:40 AM Adam Domanski  wrote:
>
> Hi,
>
> I probably found a bug in TopologyTestDriver for quite non-trivial Kafka
> Streams topology.
>
> The streaming logic is the following: There is a concept of children and
> parents. Children are aggregated under parent. Some children of master
> parent can send poison pills to other parents. Such parents die but already
> aggregated kids should go then to the master. Application remembers killed
> parents in a local store. If a new kid comes, the store is checked and then
> kid is rerouted to the master.
>
> I'm using intermediate topics in my topology as well as sometimes
> tombstones.
>
> Here is Gradle (Java 11 + Kotlin) based project which demonstrate the
> issue: https://github.com/czterocyty/kafka_streams_test_case
>
> You can see that few tests fail there as I expect that latest output Parent
> records have the latest generation.
>
> So seems that TopologyTestDriver does not care about the order of output
> records.
>
> Best regards,
> Adam Domański


Re: [DISCUSS] KIP-470: TopologyTestDriver test input and output usability improvements

2019-09-11 Thread John Roesler
Thanks for the update, Jukka!

I'd be in favor of the current proposal. Not sure how the others feel.
If people generally feel positive, it might be time to start a vote.

Thanks,
-John

On Sat, Sep 7, 2019 at 12:40 AM Jukka Karvanen
 wrote:
>
> Hi,
>
> Sorry; I need to rollback right away the one method removal what I was
> proposing.
>
> One constructor with Long restored to TestRecord, which is needed by
> TestInputTopic.
>
> Regards,
> Jukka
>
> la 7. syysk. 2019 klo 8.06 Jukka Karvanen (jukka.karva...@jukinimi.com)
> kirjoitti:
>
> > Hi,
> >
> > Let's get back to this after summer break.
> > Thanks Antony to offering help, it might be needed.
> >
> > I modified the KIP based on the feedback to be a mixture of variations 4
> > and 5.
> >
> > In TestInputTopic I removed deprecation from one variation with long
> > timestamp and removed totally one version without key.
> > The existing test code with it can be easily migrated to use remaining
> > method adding null key.
> >
> > In TestRecord I removed constructors with Long timestamp from the public
> > interface. You can migrate existing code
> > with Long timestamp constructors to use constructors with ProducerRecord
> > or ConsumerRecord.
> > There is still Long timestamp(); method like in ProducerRecord /
> > ConsumerRecord.
> >
> > Is this acceptable alternative?
> > What else is needed to conclude the discussion phase and get to voting?
> >
> > Regards,
> > Jukka
> >
> > to 5. syysk. 2019 klo 17.17 Antony Stubbs (ant...@confluent.io) kirjoitti:
> >
> >> Hi Jukka! I just came across your work - it looks great! I was taking a
> >> stab at improving the existing API, but yours already looks great and just
> >> about complete! Are you planning on continuing your work and submitting a
> >> PR? If you want some help, I'd be happy to jump in.
> >>
> >> Regards,
> >> Antony.
> >>
> >> On Thu, Aug 1, 2019 at 3:42 PM Bill Bejeck  wrote:
> >>
> >> > Hi Jukka,
> >> >
> >> > I also think 3, 4, and 5 are all good options.
> >> >
> >> > My personal preference is 4, but I also wouldn't mind going with 5 if
> >> that
> >> > is what others want to do.
> >> >
> >> > Thanks,
> >> > Bill
> >> >
> >> > On Tue, Jul 30, 2019 at 9:31 AM John Roesler  wrote:
> >> >
> >> > > Hey Jukka,
> >> > >
> >> > > Sorry for the delay.
> >> > >
> >> > > For what it's worth, I think 3, 4, and 5 are all good options. I
> >> guess my
> >> > > own preference is 5.
> >> > >
> >> > > It seems like the migration pain is a one-time concern vs. having more
> >> > > maintainable code for years thereafter.
> >> > >
> >> > > Thanks,
> >> > > -John
> >> > >
> >> > >
> >> > >
> >> > > On Tue, Jul 2, 2019 at 4:03 AM Jukka Karvanen <
> >> > jukka.karva...@jukinimi.com
> >> > > >
> >> > > wrote:
> >> > >
> >> > > > Hi Matthias,
> >> > > >
> >> > > > Generally I think using Instant and Duration make the test more
> >> > readable
> >> > > > and that's why I modified KIP based on your suggestion.
> >> > > > Now a lot of code use time with long or Long and that make the
> >> change
> >> > > more
> >> > > > complicated.
> >> > > >
> >> > > > What I tried to say about the migration is the lines without
> >> timestamp
> >> > or
> >> > > > if long timestamp is supported can be migrated mainly with search &
> >> > > > recplace:
> >> > > >
> >> > > >
> >> > > >
> >> > >
> >> >
> >> testDriver.pipeInput(recordFactory.create(WordCountLambdaExample.inputTopic,
> >> > > > nullKey, "Hello", 1L));
> >> > > >
> >> > > > ->
> >> > > >
> >> > > > *inputTopic*.pipeInput(nullKey, "Hello", 1L);
> >> > > >
> >> > > > If long is not supported as timestamp, the same is not so easy:
> >> > > >
> >> > > >
> >> > > >
> >> > >
> >> >
> >> testDriver.pipeInput(recordFactory.create(WordCountLambdaExample.inputTopic,
> >> > > > nullKey, "Hello", 1L));
> >> > > >
> >> > > > ->
> >> > > >
> >> > > > *inputTopic1*.pipeInput(nullKey, "Hello", Instant.ofEpochMilli(1L));
> >> > > >
> >> > > > Also if you need to convert arbitrary long timestamps to proper time
> >> > > > Instants, it require you need to understand the logic of the test.
> >> So
> >> > > > mechanical search & replace is not possible.
> >> > > >
> >> > > >
> >> > > > I see there are several alternatives for long vs Instant / Duration:
> >> > > >
> >> > > > 1. All times as long/Long like in this version:
> >> > > >
> >> > > >
> >> > >
> >> >
> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=119550011
> >> > > >
> >> > > > (startTimestampMs, autoAdvanceMs as parameter of  createInputTopic
> >> > > > instead of configureTiming)
> >> > > >
> >> > > > 2. Auto timestamping configured with Instant and Duration, pipeInput
> >> > > > and TestRecord with long:
> >> > > >
> >> > > >
> >> > >
> >> >
> >> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=120722523
> >> > > >
> >> > > >
> >> > > > 3. (CURRENT) Auto timestamping configured with Instant and Duration,
> >> > > > pipeInput and TestRecord with Instant, version with long deprecated:
> >> > > >