[jira] [Created] (KAFKA-9907) Switch default build to Scala 2.13

2020-04-22 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-9907:
--

 Summary: Switch default build to Scala 2.13
 Key: KAFKA-9907
 URL: https://issues.apache.org/jira/browse/KAFKA-9907
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma
 Fix For: 2.6.0


Scala 2.13.2 introduces support for suppressing warnings, which makes it 
possible to enable fatal warnings. This is useful enough from a development 
perspective to justify this change.

In addition, Scala 2.13.2 also has a Vector implementation with significant 
performance improvements and encoding of String matches to switches.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: kafka-trunk-jdk11 #1378

2020-04-22 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9388: Refactor integration tests to always use different


--
[...truncated 3.03 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldReturnName 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldPutWindowStartTimestampWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > 
shouldReturnIsPersistent PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardClose 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardFlush 
PASSED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
STARTED

org.apache.kafka.streams.internals.WindowStoreFacadeTest > shouldForwardInit 
PASSED

org.apache.kafka.streams.TestTopicsTest > testNonUsedOutputTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testNonUsedOutputTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testEmptyTopic STARTED

org.apache.kafka.streams.TestTopicsTest > testEmptyTopic PASSED

org.apache.kafka.streams.TestTopicsTest > testStartTimestamp STARTED

org.apache.kafka.streams.TestTopicsTest > testStartTimestamp PASSED

org.apache.kafka.streams.TestTopicsTest > testNegativeAdvance STARTED

org.apache.kafka.streams.TestTopicsTest > testNegativeAdvance PASSED

org.apache.kafka.streams.TestTopicsTest > shouldNotAllowToCreateWithNullDriver 
STARTED

org.apache.kafka.streams.TestTopicsTest > shouldNotAllowToCreateWithNullDriver 
PASSED

org.apache.kafka.streams.TestTopicsTest > testDuration STARTED

org.apache.kafka.streams.TestTopicsTest > testDuration PASSED

org.apache.kafka.streams.TestTopicsTest > testOutputToString STARTED

org.apache.kafka.streams.TestTopicsTest > testOutputToString PASSED

org.apache.kafka.streams.TestTopicsTest > testValue STARTED

org.apache.kafka.streams.TestTopicsTest > testValue PASSED

org.apache.kafka.streams.TestTopicsTest > testTimestampAutoAdvance STARTED

org.apache.kafka.streams.TestTopicsTest > testTimestampAutoAdvance PASSED

org.apache.kafka.streams.TestTopicsTest > testOutputWrongSerde STARTED

org.apache.kafka.streams.TestTopicsTest > testOutputWrongSerde PASSED

org.apache.kafka.streams.TestTopicsTest > 
shouldNotAllowToCreateOutputTopicWithNullTopicName STARTED

org.apache.kafka.streams.TestTopicsTest > 

Jenkins build is back to normal : kafka-trunk-jdk8 #4458

2020-04-22 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : kafka-trunk-jdk11 #1377

2020-04-22 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-9906) Is bytesSinceLastIndexEntry updated correct when LogSegment.append() is called ?

2020-04-22 Thread Xiang Zhang (Jira)
Xiang Zhang created KAFKA-9906:
--

 Summary: Is bytesSinceLastIndexEntry updated correct when 
LogSegment.append() is called ?
 Key: KAFKA-9906
 URL: https://issues.apache.org/jira/browse/KAFKA-9906
 Project: Kafka
  Issue Type: Improvement
Reporter: Xiang Zhang






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSSION] KIP-418: A method-chaining way to branch KStream

2020-04-22 Thread Ivan Ponomarev

Hi,

I have read the John's "DSL design principles" and have completely 
rewritten the KIP, see 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream



This version includes all the previous discussion results and follows 
the design principles, with one exception.


The exception is

branch(Predicate predicate, Branched branched)

which formally violates 'no more than one parameter' rule, but I think 
here it is justified.


We must provide a predicate for a branch and don't need to provide one 
for the default branch. Thus for both operations we may use a single 
Branched parameter class, with an extra method parameter for `branch`.


Since predicate is a natural, necessary part of a branch, no 
'proliferation of overloads, deprecations, etc.' is expected here as it 
is said in the rationale for the 'single parameter rule'.


WDYT, is this KIP mature enough to begin voting?

Regards,

Ivan

21.04.2020 2:09, Matthias J. Sax пишет:

Ivan,

no worries about getting side tracked. Glad to have you back!

The DSL improved further in the meantime and we already have a `Named`
config object to name operators. It seems reasonable to me to build on this.

Furthermore, John did a writeup about "DSL design principles" that we
want to follow:
https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+DSL+Grammar
-- might be worth to checkout.


-Matthias


On 4/17/20 4:30 PM, Ivan Ponomarev wrote:

Hi everyone!

Let me revive the discussion of this KIP.

I'm very sorry for stopping my participation in the discussion in June
2019. My project work was very intensive then and it didn't leave me
spare time. But I think I must finish this, because we invested
substantial effort into this discussion and I'm not feel entitled to
propose other things before this one is finalized.

During these months I proceeded with writing and reviewing Kafka
Streams-related code. Every time I needed branching, Spring-Kafka's
KafkaStreamBrancher class of my invention (the original idea for this
KIP) worked for me -- that's another reason why I gave up pushing the
KIP forward. When I was coming across the problem with the scope of
branches, I worked around it this way:

AtomicReference> result = new AtomicReference<>();
new KafkaStreamBrancher<>()
     .branch()
     .defaultBranch(result::set)
     .onTopOf(someStream);
result.get()...


And yes, of course I don't feel very happy with this approach.

I think that Matthias came up with a bright solution in his post from
May, 24th 2019. Let me quote it:

KStream#split() -> KBranchedStream
// branch is not easily accessible in current scope
KBranchedStream#branch(Predicate, Consumer)
   -> KBranchedStream
// assign a name to the branch and
// return the sub-stream to the current scope later
//
// can be simple as `#branch(p, s->s, "name")`
// or also complex as `#branch(p, s->s.filter(...), "name")`
KBranchedStream#branch(Predicate, Function, String)
   -> KBranchedStream
// default branch is not easily accessible
// return map of all named sub-stream into current scope
KBranchedStream#default(Cosumer)
   -> Map
// assign custom name to default-branch
// return map of all named sub-stream into current scope
KBranchedStream#default(Function, String)
   -> Map
// assign a default name for default
// return map of all named sub-stream into current scope
KBranchedStream#defaultBranch(Function)
   -> Map
// return map of all names sub-stream into current scope
KBranchedStream#noDefaultBranch()
   -> Map

I believe this would satisfy everyone. Optional names seems to be a good
idea: when you don't need to have the branches in the same scope, you
just don't use names and you don't risk making your code brittle. Or,
you might want to add names just for debugging purposes. Or, finally,
you might use the returned Map to have the named branches in the
original scope.

There also was an input from John Roesler on June 4th, 2019, who
suggested using Named class. I can't comment on this. The idea seems
reasonable, but in this matter I'd rather trust people who are more
familiar with Streams API design principles than me.

Regards,

Ivan



08.10.2019 1:38, Matthias J. Sax пишет:

I am moving this KIP into "inactive status". Feel free to resume the KIP
at any point.

If anybody else is interested in picking up this KIP, feel free to do so.



-Matthias

On 7/11/19 4:00 PM, Matthias J. Sax wrote:

Ivan,

did you see my last reply? What do you think about my proposal to mix
both approaches and try to get best-of-both worlds?


-Matthias

On 6/11/19 3:56 PM, Matthias J. Sax wrote:

Thanks for the input John!


under your suggestion, it seems that the name is required


If you want to get the `KStream` as part of the `Map` back using a
`Function`, yes. If you follow the "embedded chaining" pattern using a
`Consumer`, no.

Allowing for a default name via `split()` can of course be done.
Similarly, using `Named` instead of `String` is possible.

I 

Build failed in Jenkins: kafka-trunk-jdk8 #4457

2020-04-22 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-9863: replace the deprecated --zookeeper options in the


--
[...truncated 3.01 MB...]
org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis STARTED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis PASSED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime STARTED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime PASSED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep PASSED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep PASSED


[jira] [Created] (KAFKA-9905) The equals functions for generated classes should compare all fields

2020-04-22 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-9905:
---

 Summary: The equals functions for generated classes should compare 
all fields
 Key: KAFKA-9905
 URL: https://issues.apache.org/jira/browse/KAFKA-9905
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe
Assignee: Colin McCabe


The equals functions for generated classes should compare all fields, to avoid 
confusion.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-04-22 Thread Sönke Liebau
Hi Colin,

thanks for your summary! Just one question - and I may be missing an
obvious point here..
You write:

"The initial broker should do authentication (who are you?) and come up
with a principal name.  Then it creates an envelope request, which will
contain that principal name, and sends it along with the unmodified
original request to the final broker.   [... ] The final broker knows it
can trust the principal name in the envelope (since EnvelopeRequest
requires CLUSTERACTION on CLUSTER).  So it can use that principal name for
authorization (given who you are, what can you do?) "

My understanding is, that you don't want to serialize the Principal (due to
the discussed issues with custom principals) but reduce the principal down
to a string representation that would be used for logging and authorization?
If that understanding is correct then I don't think we could use the
regular Authorizer on the target broker, because that would need the actual
principal object to work on.

Also, a thought that just occurred to me, we might actually need to log
different principal strings for the case of queries like AlterConfigs
(mentioned by Rajini) which may contain multiple resources. Take an LDAP
authorizer that grants access based on group membership - the same
alterconfig request may contain resources that are authorized based on
group1 as well as resources authorized based on membership in group 2 ..
And in all cases we'd need to log the specific reason I think..

Basically I think that we might have a hard time properly authorizing and
logging without being able to forward the entire principal.. but again, I
might be heading down an entirely wrong path here :)

Best regards,
Sönke








On Wed, 22 Apr 2020 at 23:13, Guozhang Wang  wrote:

> Colin, Boyang: thanks for the updates, I agree that an EnvelopeRequest
> would be a less vulnerable approach than optional fields, and I'm just
> wondering if we would keep the EnvelopeRequest for a long time. I was
> thinking that, potentially if we require clients to be on newer version
> when talking to a 3.0+ (assuming 3.0 is the bridge release) brokers, then
> we do not need to keep this code for too long, but I think that would be a
> very hasty compatibility breaking so maybe we indeed need to keep this
> forwarding mechanism many years.
>
> Regarding future use cases, I think the example that Boyang mentioned may
> not be very practical honestly, because when there's a connectivity issue,
> it is either a network partition between "controller, A | B", or
> "controller | A, B". In other words, if the controller can talk to A, then
> very likely A would not be able to talk to B either... anyways, since the
> forwarding would be there for a sufficiently long time, I think keeping the
> additional envelope makes sense.
>
>
> Guozhang
>
> On Wed, Apr 22, 2020 at 1:47 PM Boyang Chen 
> wrote:
>
> > Thanks Colin for the summary! And Guozhang, regarding the future use
> cases,
> > consider a scenario where there are temporary connectivity issue between
> > controller to a fellow broker A, the controller could then leverage
> another
> > healthy broker B to do a forwarding request to A in order to maintain a
> > communication.
> >
> > Even for KIP-590 scope, the forwarding mechanism shall not be transit as
> we
> > do need to support older version of admin clients for a couple of years
> > IIUC.
> >
> > Boyang
> >
> > On Wed, Apr 22, 2020 at 1:29 PM Colin McCabe  wrote:
> >
> > > Hi all,
> > >
> > > I guess the way I see this working is that the request gets sent from
> the
> > > client, to the initial broker, and then forwarded to the final broker.
> > >
> > > The initial broker should do authentication (who are you?) and come up
> > > with a principal name.  Then it creates an envelope request, which will
> > > contain that principal name, and sends it along with the unmodified
> > > original request to the final broker.  (I agree with Tom and Rajini
> that
> > we
> > > can't forward the information needed to do authentication on the final
> > > broker, but I also think we don't need to, since we can do it on the
> > > initial broker.)
> > >
> > > The final broker knows it can trust the principal name in the envelope
> > > (since EnvelopeRequest requires CLUSTERACTION on CLUSTER).  So it can
> use
> > > that principal name for authorization (given who you are, what can you
> > > do?)  The forwarded principal name will also be used for logging.
> > >
> > > One question is why we need an EnvelopeRequest.  Well, if we don't have
> > an
> > > EnvelopeRequest, we need somewhere else to put the forwarded principal
> > > name.  I don't think overriding an existing field (like clientId) is a
> > good
> > > option for this.  It's messy, and loses information.  It also raises
> the
> > > question of how the final broker knows that the clientId in the
> received
> > > message is not "really" a clientId, but is a principal name.  Without
> an
> > > envelope, there's no way to 

[jira] [Resolved] (KAFKA-9388) Flaky Test StandbyTaskCreationIntegrationTest.shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables

2020-04-22 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9388?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9388.
--
Fix Version/s: 2.6.0
 Assignee: Guozhang Wang
   Resolution: Fixed

> Flaky Test 
> StandbyTaskCreationIntegrationTest.shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables
> -
>
> Key: KAFKA-9388
> URL: https://issues.apache.org/jira/browse/KAFKA-9388
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.5.0
>Reporter: Matthias J. Sax
>Assignee: Guozhang Wang
>Priority: Critical
>  Labels: flaky-test
> Fix For: 2.6.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/122/testReport/junit/org.apache.kafka.streams.integration/StandbyTaskCreationIntegrationTest/shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. At 
> least one client did not reach state RUNNING with active tasks and stand-by 
> tasks: Client 1 is NOT OK, client 2 is NOT OK. at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:24) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$4(TestUtils.java:369) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:356) at 
> org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.waitUntilBothClientAreOK(StandbyTaskCreationIntegrationTest.java:178)
>  at 
> org.apache.kafka.streams.integration.StandbyTaskCreationIntegrationTest.shouldCreateStandByTasksForMaterializedAndOptimizedSourceTables(StandbyTaskCreationIntegrationTest.java:141){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-04-22 Thread Guozhang Wang
Colin, Boyang: thanks for the updates, I agree that an EnvelopeRequest
would be a less vulnerable approach than optional fields, and I'm just
wondering if we would keep the EnvelopeRequest for a long time. I was
thinking that, potentially if we require clients to be on newer version
when talking to a 3.0+ (assuming 3.0 is the bridge release) brokers, then
we do not need to keep this code for too long, but I think that would be a
very hasty compatibility breaking so maybe we indeed need to keep this
forwarding mechanism many years.

Regarding future use cases, I think the example that Boyang mentioned may
not be very practical honestly, because when there's a connectivity issue,
it is either a network partition between "controller, A | B", or
"controller | A, B". In other words, if the controller can talk to A, then
very likely A would not be able to talk to B either... anyways, since the
forwarding would be there for a sufficiently long time, I think keeping the
additional envelope makes sense.


Guozhang

On Wed, Apr 22, 2020 at 1:47 PM Boyang Chen 
wrote:

> Thanks Colin for the summary! And Guozhang, regarding the future use cases,
> consider a scenario where there are temporary connectivity issue between
> controller to a fellow broker A, the controller could then leverage another
> healthy broker B to do a forwarding request to A in order to maintain a
> communication.
>
> Even for KIP-590 scope, the forwarding mechanism shall not be transit as we
> do need to support older version of admin clients for a couple of years
> IIUC.
>
> Boyang
>
> On Wed, Apr 22, 2020 at 1:29 PM Colin McCabe  wrote:
>
> > Hi all,
> >
> > I guess the way I see this working is that the request gets sent from the
> > client, to the initial broker, and then forwarded to the final broker.
> >
> > The initial broker should do authentication (who are you?) and come up
> > with a principal name.  Then it creates an envelope request, which will
> > contain that principal name, and sends it along with the unmodified
> > original request to the final broker.  (I agree with Tom and Rajini that
> we
> > can't forward the information needed to do authentication on the final
> > broker, but I also think we don't need to, since we can do it on the
> > initial broker.)
> >
> > The final broker knows it can trust the principal name in the envelope
> > (since EnvelopeRequest requires CLUSTERACTION on CLUSTER).  So it can use
> > that principal name for authorization (given who you are, what can you
> > do?)  The forwarded principal name will also be used for logging.
> >
> > One question is why we need an EnvelopeRequest.  Well, if we don't have
> an
> > EnvelopeRequest, we need somewhere else to put the forwarded principal
> > name.  I don't think overriding an existing field (like clientId) is a
> good
> > option for this.  It's messy, and loses information.  It also raises the
> > question of how the final broker knows that the clientId in the received
> > message is not "really" a clientId, but is a principal name.  Without an
> > envelope, there's no way to clearly mark a request as forwarded, so
> there's
> > no reason for the final broker to treat this differently than a regular
> > clientId (or whatever).
> >
> > We talked about using optional fields to contain the forwarded principal
> > name, but this is also messy and potentially dangerous.  Older brokers
> will
> > simply ignore the optional fields, which could result in them executing
> > operations as the wrong principal.  Of course, this would require a
> > misconfiguration in order to happen, but it still seems better to set up
> > the system so that this misconfiguration is detected, rather than
> silently
> > ignored.
> >
> > It's true that the need for forwarding is "temporary" in some sense,
> since
> > we only need it for older clients.  However, we will want to support
> these
> > older clients for many years to come.
> >
> > I agree that the usefulness of EnvelopeRequest is limited by it being a
> > superuser-only request at the moment.  Perhaps there are some changes to
> > how custom principals work that would allow us to get around this in the
> > future.  We should think about that so that we have this functionality in
> > the future if it's needed.
> >
> > best,
> > Colin
> >
> >
> > On Wed, Apr 22, 2020, at 11:21, Guozhang Wang wrote:
> > > Hello Gwen,
> > >
> > > The purpose here is for maintaining compatibility old clients, who do
> not
> > > have functionality to do re-routing admin requests themselves. New
> > clients
> > > can of course do this themselves by detecting who's the controller.
> > >
> > >
> > > Hello Colin / Boyang,
> > >
> > > Regarding the usage of the envelope, I'm curious what are the potential
> > > future use cases that would require request forwarding and hence
> envelope
> > > would be useful? Originally I thought that the forwarding mechanism is
> > only
> > > temporary as we need it for the bridge release, and moving forward we
> > 

Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-04-22 Thread Boyang Chen
Thanks Colin for the summary! And Guozhang, regarding the future use cases,
consider a scenario where there are temporary connectivity issue between
controller to a fellow broker A, the controller could then leverage another
healthy broker B to do a forwarding request to A in order to maintain a
communication.

Even for KIP-590 scope, the forwarding mechanism shall not be transit as we
do need to support older version of admin clients for a couple of years
IIUC.

Boyang

On Wed, Apr 22, 2020 at 1:29 PM Colin McCabe  wrote:

> Hi all,
>
> I guess the way I see this working is that the request gets sent from the
> client, to the initial broker, and then forwarded to the final broker.
>
> The initial broker should do authentication (who are you?) and come up
> with a principal name.  Then it creates an envelope request, which will
> contain that principal name, and sends it along with the unmodified
> original request to the final broker.  (I agree with Tom and Rajini that we
> can't forward the information needed to do authentication on the final
> broker, but I also think we don't need to, since we can do it on the
> initial broker.)
>
> The final broker knows it can trust the principal name in the envelope
> (since EnvelopeRequest requires CLUSTERACTION on CLUSTER).  So it can use
> that principal name for authorization (given who you are, what can you
> do?)  The forwarded principal name will also be used for logging.
>
> One question is why we need an EnvelopeRequest.  Well, if we don't have an
> EnvelopeRequest, we need somewhere else to put the forwarded principal
> name.  I don't think overriding an existing field (like clientId) is a good
> option for this.  It's messy, and loses information.  It also raises the
> question of how the final broker knows that the clientId in the received
> message is not "really" a clientId, but is a principal name.  Without an
> envelope, there's no way to clearly mark a request as forwarded, so there's
> no reason for the final broker to treat this differently than a regular
> clientId (or whatever).
>
> We talked about using optional fields to contain the forwarded principal
> name, but this is also messy and potentially dangerous.  Older brokers will
> simply ignore the optional fields, which could result in them executing
> operations as the wrong principal.  Of course, this would require a
> misconfiguration in order to happen, but it still seems better to set up
> the system so that this misconfiguration is detected, rather than silently
> ignored.
>
> It's true that the need for forwarding is "temporary" in some sense, since
> we only need it for older clients.  However, we will want to support these
> older clients for many years to come.
>
> I agree that the usefulness of EnvelopeRequest is limited by it being a
> superuser-only request at the moment.  Perhaps there are some changes to
> how custom principals work that would allow us to get around this in the
> future.  We should think about that so that we have this functionality in
> the future if it's needed.
>
> best,
> Colin
>
>
> On Wed, Apr 22, 2020, at 11:21, Guozhang Wang wrote:
> > Hello Gwen,
> >
> > The purpose here is for maintaining compatibility old clients, who do not
> > have functionality to do re-routing admin requests themselves. New
> clients
> > can of course do this themselves by detecting who's the controller.
> >
> >
> > Hello Colin / Boyang,
> >
> > Regarding the usage of the envelope, I'm curious what are the potential
> > future use cases that would require request forwarding and hence envelope
> > would be useful? Originally I thought that the forwarding mechanism is
> only
> > temporary as we need it for the bridge release, and moving forward we
> will
> > get rid of this to simplify the code base. If we do have valid use cases
> in
> > the future which makes us believe that request forwarding would actually
> be
> > a permanent feature retained on the broker side, I'm on board with adding
> > the envelope request protocol.
> >
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Wed, Apr 22, 2020 at 8:22 AM Gwen Shapira  wrote:
> >
> > > Hey Boyang,
> > >
> > > Sorry if this was already discussed, but I didn't see this as rejected
> > > alternative:
> > >
> > > Until now, we always did client side routing - the client itself found
> the
> > > controller via metadata and directed requests accordingly. Brokers that
> > > were not the controller, rejected those requests.
> > >
> > > Why did we decide to move to broker side routing? Was the client-side
> > > option discussed and rejected somewhere and I missed it?
> > >
> > > Gwen
> > >
> > > On Fri, Apr 3, 2020, 4:45 PM Boyang Chen 
> > > wrote:
> > >
> > > > Hey all,
> > > >
> > > > I would like to start off the discussion for KIP-590, a follow-up
> > > > initiative after KIP-500:
> > > >
> > > >
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller
> > > >
> > > > 

Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-04-22 Thread Colin McCabe
Hi all,

I guess the way I see this working is that the request gets sent from the 
client, to the initial broker, and then forwarded to the final broker.

The initial broker should do authentication (who are you?) and come up with a 
principal name.  Then it creates an envelope request, which will contain that 
principal name, and sends it along with the unmodified original request to the 
final broker.  (I agree with Tom and Rajini that we can't forward the 
information needed to do authentication on the final broker, but I also think 
we don't need to, since we can do it on the initial broker.)

The final broker knows it can trust the principal name in the envelope (since 
EnvelopeRequest requires CLUSTERACTION on CLUSTER).  So it can use that 
principal name for authorization (given who you are, what can you do?)  The 
forwarded principal name will also be used for logging.

One question is why we need an EnvelopeRequest.  Well, if we don't have an 
EnvelopeRequest, we need somewhere else to put the forwarded principal name.  I 
don't think overriding an existing field (like clientId) is a good option for 
this.  It's messy, and loses information.  It also raises the question of how 
the final broker knows that the clientId in the received message is not 
"really" a clientId, but is a principal name.  Without an envelope, there's no 
way to clearly mark a request as forwarded, so there's no reason for the final 
broker to treat this differently than a regular clientId (or whatever).

We talked about using optional fields to contain the forwarded principal name, 
but this is also messy and potentially dangerous.  Older brokers will simply 
ignore the optional fields, which could result in them executing operations as 
the wrong principal.  Of course, this would require a misconfiguration in order 
to happen, but it still seems better to set up the system so that this 
misconfiguration is detected, rather than silently ignored.

It's true that the need for forwarding is "temporary" in some sense, since we 
only need it for older clients.  However, we will want to support these older 
clients for many years to come.

I agree that the usefulness of EnvelopeRequest is limited by it being a 
superuser-only request at the moment.  Perhaps there are some changes to how 
custom principals work that would allow us to get around this in the future.  
We should think about that so that we have this functionality in the future if 
it's needed.

best,
Colin


On Wed, Apr 22, 2020, at 11:21, Guozhang Wang wrote:
> Hello Gwen,
> 
> The purpose here is for maintaining compatibility old clients, who do not
> have functionality to do re-routing admin requests themselves. New clients
> can of course do this themselves by detecting who's the controller.
> 
> 
> Hello Colin / Boyang,
> 
> Regarding the usage of the envelope, I'm curious what are the potential
> future use cases that would require request forwarding and hence envelope
> would be useful? Originally I thought that the forwarding mechanism is only
> temporary as we need it for the bridge release, and moving forward we will
> get rid of this to simplify the code base. If we do have valid use cases in
> the future which makes us believe that request forwarding would actually be
> a permanent feature retained on the broker side, I'm on board with adding
> the envelope request protocol.
> 
> 
> 
> Guozhang
> 
> 
> 
> 
> On Wed, Apr 22, 2020 at 8:22 AM Gwen Shapira  wrote:
> 
> > Hey Boyang,
> >
> > Sorry if this was already discussed, but I didn't see this as rejected
> > alternative:
> >
> > Until now, we always did client side routing - the client itself found the
> > controller via metadata and directed requests accordingly. Brokers that
> > were not the controller, rejected those requests.
> >
> > Why did we decide to move to broker side routing? Was the client-side
> > option discussed and rejected somewhere and I missed it?
> >
> > Gwen
> >
> > On Fri, Apr 3, 2020, 4:45 PM Boyang Chen 
> > wrote:
> >
> > > Hey all,
> > >
> > > I would like to start off the discussion for KIP-590, a follow-up
> > > initiative after KIP-500:
> > >
> > >
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller
> > >
> > > This KIP proposes to migrate existing Zookeeper mutation paths, including
> > > configuration, security and quota changes, to controller-only by always
> > > routing these alterations to the controller.
> > >
> > > Let me know your thoughts!
> > >
> > > Best,
> > > Boyang
> > >
> >
> 
> 
> -- 
> -- Guozhang
>


Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-04-22 Thread Guozhang Wang
Hello Gwen,

The purpose here is for maintaining compatibility old clients, who do not
have functionality to do re-routing admin requests themselves. New clients
can of course do this themselves by detecting who's the controller.


Hello Colin / Boyang,

Regarding the usage of the envelope, I'm curious what are the potential
future use cases that would require request forwarding and hence envelope
would be useful? Originally I thought that the forwarding mechanism is only
temporary as we need it for the bridge release, and moving forward we will
get rid of this to simplify the code base. If we do have valid use cases in
the future which makes us believe that request forwarding would actually be
a permanent feature retained on the broker side, I'm on board with adding
the envelope request protocol.



Guozhang




On Wed, Apr 22, 2020 at 8:22 AM Gwen Shapira  wrote:

> Hey Boyang,
>
> Sorry if this was already discussed, but I didn't see this as rejected
> alternative:
>
> Until now, we always did client side routing - the client itself found the
> controller via metadata and directed requests accordingly. Brokers that
> were not the controller, rejected those requests.
>
> Why did we decide to move to broker side routing? Was the client-side
> option discussed and rejected somewhere and I missed it?
>
> Gwen
>
> On Fri, Apr 3, 2020, 4:45 PM Boyang Chen 
> wrote:
>
> > Hey all,
> >
> > I would like to start off the discussion for KIP-590, a follow-up
> > initiative after KIP-500:
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller
> >
> > This KIP proposes to migrate existing Zookeeper mutation paths, including
> > configuration, security and quota changes, to controller-only by always
> > routing these alterations to the controller.
> >
> > Let me know your thoughts!
> >
> > Best,
> > Boyang
> >
>


-- 
-- Guozhang


Re: [Vote] KIP-588: Allow producers to recover gracefully from transaction timeouts

2020-04-22 Thread Boyang Chen
Hey Jason,

thanks for the suggestions! Addressed in the KIP.

On Wed, Apr 22, 2020 at 9:21 AM Jason Gustafson  wrote:

> +1 Just a couple small comments:
>
> 1. My comment about `initTransactions()` usage in the javadoc above appears
> not to have been addressed.
> 2. For the handling of INVALID_PRODUCER_EPOCH in the produce response,
> would we only try to abort if the broker supports the newer protocol
> version? I guess it would be simpler in the implementation if the producer
> did it in any case even if it might not be useful for older versions.
>
> -Jason
>
> On Fri, Apr 17, 2020 at 3:55 PM Guozhang Wang  wrote:
>
> > Sounds good to me. Thanks Boyang.
> >
> > On Fri, Apr 17, 2020 at 3:32 PM Boyang Chen 
> > wrote:
> >
> > > Thanks Guozhang,
> > >
> > > I think most of the complexity comes from our intention to benefit
> older
> > > clients. After a second thought, I think the add-on complexity
> > counteracts
> > > the gain here as only 2.5 client is getting a slice of the resilience
> > > improvement, not for many older versions.
> > >
> > > So I decide to drop the UNKNOWN_PRODUCER_ID path, by just claiming that
> > > this change would only benefit 2.6 Producer clients. So the only path
> > that
> > > needs version detection is the new transaction coordinator handling
> > > transactional requests. If the Producer is 2.6+, we pick
> > > PRODUCER_FENCED(new error code) or TRANSACTION_TIMED_OUT as the
> response;
> > > otherwise  we return INVALID_PRODUCE_EPOCH to be consistent with older
> > > clients.
> > >
> > > Does this sound like a better plan? I already updated the KIP with
> > > simplifications.
> > >
> > >
> > > On Fri, Apr 17, 2020 at 12:02 PM Guozhang Wang 
> > wrote:
> > >
> > > > Hi Boyang,
> > > >
> > > > Your reply to 3) seems conflicting with your other answers which is a
> > bit
> > > > confusing to me. Following your other answers, it seems you suggest
> > > > returning UNKNOWN_PRODUCER_ID so that 2.5 clients can trigger retry
> > logic
> > > > as well?
> > > >
> > > > To complete my reasoning here as a complete picture:
> > > >
> > > > a) post KIP-360 (2.5+) the partition leader broker does not return
> > > > UNKNOWN_PRODUCER_ID any more.
> > > > b) upon seeing an old epoch, partition leader cannot tell if it is
> due
> > to
> > > > fencing or timeout; so it could only return INVALID_PRODUCER_EPOCH.
> > > >
> > > > So the basic idea is to let the clients ask the transaction
> coordinator
> > > for
> > > > the source of truth:
> > > >
> > > > 1) 2.5+ client would handle UNKNOWN_PRODUCER_ID (which could only be
> > > > returned from old brokers) by trying to re-initialize with the
> > > transaction
> > > > coordinator; the coordinator would then tell it whether it is
> > > > PRODUCER_FENCED or TXN_TIMEOUT. And for old brokers, it would always
> > > return
> > > > PRODUCER_FENCED anyways.
> > > > 2) 2.6+ client would also handle INVALID_PRODUCER_EPOCH with the
> retry
> > > > initializing logic; and similarly the transaction coordinator would
> > > > return PRODUCER_FENCED or TXN_TIMEOUT if it is new or always
> > > > return PRODUCER_FENCED if it is old.
> > > >
> > > > The question open is, whether
> > > >
> > > > * 3) the new broker should return UNKNOWN_PRODUCER_ID now when it is
> > > > *supposed* to return INVALID_PRODUCER_EPOCH and it found the request
> is
> > > > from 2.5 client (note as mentioned in a) right now we do not
> > > > return UNKNOWN_PRODUCER_ID from brokers anymore).
> > > >
> > > > If it does, then 2.5 client could still do the retry logic to the
> > > > transaction coordinator, i.e. benefit from KIP-360; but the cost is
> > > complex
> > > > logic on the broker side as well as producer API version bump up.
> > > > If it does not, then when INVALID_PRODUCER_EPOCH is returned to the
> old
> > > > client it would treat it as fatal and not ask the txn coordinator;
> but
> > it
> > > > simplifies the broker logic and also do not require producer API
> > version
> > > > bump.
> > > >
> > > > Personally I'd suggest we do the latter, knowing that it would not
> > > benefit
> > > > 2.5 client when the partition leader gets an old epoch and does not
> > know
> > > > whether it is Fenced or Timed Out.
> > > >
> > > >
> > > > Guozhang
> > > >
> > > > On Thu, Apr 16, 2020 at 7:59 PM Boyang Chen <
> > reluctanthero...@gmail.com>
> > > > wrote:
> > > >
> > > > > Thanks Jason and Guozhang for the thoughts.
> > > > >
> > > > > On Thu, Apr 16, 2020 at 6:09 PM Guozhang Wang 
> > > > wrote:
> > > > >
> > > > > > For 2/3 above, originally I was not thinking that we will have a
> > > > > different
> > > > > > exception for INVALID_PRODUCER_EPOCH and hence was thinking that
> in
> > > > order
> > > > > > to leverage KIP-360 for it, we'd have to let the broker to
> > > > > > return UNKNOWN_PRODUCER_ID. I.e. we'd change the logic of
> partition
> > > > > leader
> > > > > > as well to return UNKNOWN_PRODUCER_ID to let the producer try
> > > > > > re-initializing the PID on the 

Clarification regarding multi topics implementation

2020-04-22 Thread Suresh Chidambaram
Hi Team,

Greetings.

I have a use-case wherein I have to consume messages from multiple topics
using Kafka and process it using Kafka Streams,  then publish the message
to multiple target topics.

The example is below.

Source topic A - process A - target topic A
Source topic B - process B - target topic B

Could someone help me achieving this solution?

I have to use Spring Boot with Kafka Streams for this solution.

Thank you.
C Suresh


Re: [Vote] KIP-588: Allow producers to recover gracefully from transaction timeouts

2020-04-22 Thread Jason Gustafson
+1 Just a couple small comments:

1. My comment about `initTransactions()` usage in the javadoc above appears
not to have been addressed.
2. For the handling of INVALID_PRODUCER_EPOCH in the produce response,
would we only try to abort if the broker supports the newer protocol
version? I guess it would be simpler in the implementation if the producer
did it in any case even if it might not be useful for older versions.

-Jason

On Fri, Apr 17, 2020 at 3:55 PM Guozhang Wang  wrote:

> Sounds good to me. Thanks Boyang.
>
> On Fri, Apr 17, 2020 at 3:32 PM Boyang Chen 
> wrote:
>
> > Thanks Guozhang,
> >
> > I think most of the complexity comes from our intention to benefit older
> > clients. After a second thought, I think the add-on complexity
> counteracts
> > the gain here as only 2.5 client is getting a slice of the resilience
> > improvement, not for many older versions.
> >
> > So I decide to drop the UNKNOWN_PRODUCER_ID path, by just claiming that
> > this change would only benefit 2.6 Producer clients. So the only path
> that
> > needs version detection is the new transaction coordinator handling
> > transactional requests. If the Producer is 2.6+, we pick
> > PRODUCER_FENCED(new error code) or TRANSACTION_TIMED_OUT as the response;
> > otherwise  we return INVALID_PRODUCE_EPOCH to be consistent with older
> > clients.
> >
> > Does this sound like a better plan? I already updated the KIP with
> > simplifications.
> >
> >
> > On Fri, Apr 17, 2020 at 12:02 PM Guozhang Wang 
> wrote:
> >
> > > Hi Boyang,
> > >
> > > Your reply to 3) seems conflicting with your other answers which is a
> bit
> > > confusing to me. Following your other answers, it seems you suggest
> > > returning UNKNOWN_PRODUCER_ID so that 2.5 clients can trigger retry
> logic
> > > as well?
> > >
> > > To complete my reasoning here as a complete picture:
> > >
> > > a) post KIP-360 (2.5+) the partition leader broker does not return
> > > UNKNOWN_PRODUCER_ID any more.
> > > b) upon seeing an old epoch, partition leader cannot tell if it is due
> to
> > > fencing or timeout; so it could only return INVALID_PRODUCER_EPOCH.
> > >
> > > So the basic idea is to let the clients ask the transaction coordinator
> > for
> > > the source of truth:
> > >
> > > 1) 2.5+ client would handle UNKNOWN_PRODUCER_ID (which could only be
> > > returned from old brokers) by trying to re-initialize with the
> > transaction
> > > coordinator; the coordinator would then tell it whether it is
> > > PRODUCER_FENCED or TXN_TIMEOUT. And for old brokers, it would always
> > return
> > > PRODUCER_FENCED anyways.
> > > 2) 2.6+ client would also handle INVALID_PRODUCER_EPOCH with the retry
> > > initializing logic; and similarly the transaction coordinator would
> > > return PRODUCER_FENCED or TXN_TIMEOUT if it is new or always
> > > return PRODUCER_FENCED if it is old.
> > >
> > > The question open is, whether
> > >
> > > * 3) the new broker should return UNKNOWN_PRODUCER_ID now when it is
> > > *supposed* to return INVALID_PRODUCER_EPOCH and it found the request is
> > > from 2.5 client (note as mentioned in a) right now we do not
> > > return UNKNOWN_PRODUCER_ID from brokers anymore).
> > >
> > > If it does, then 2.5 client could still do the retry logic to the
> > > transaction coordinator, i.e. benefit from KIP-360; but the cost is
> > complex
> > > logic on the broker side as well as producer API version bump up.
> > > If it does not, then when INVALID_PRODUCER_EPOCH is returned to the old
> > > client it would treat it as fatal and not ask the txn coordinator; but
> it
> > > simplifies the broker logic and also do not require producer API
> version
> > > bump.
> > >
> > > Personally I'd suggest we do the latter, knowing that it would not
> > benefit
> > > 2.5 client when the partition leader gets an old epoch and does not
> know
> > > whether it is Fenced or Timed Out.
> > >
> > >
> > > Guozhang
> > >
> > > On Thu, Apr 16, 2020 at 7:59 PM Boyang Chen <
> reluctanthero...@gmail.com>
> > > wrote:
> > >
> > > > Thanks Jason and Guozhang for the thoughts.
> > > >
> > > > On Thu, Apr 16, 2020 at 6:09 PM Guozhang Wang 
> > > wrote:
> > > >
> > > > > For 2/3 above, originally I was not thinking that we will have a
> > > > different
> > > > > exception for INVALID_PRODUCER_EPOCH and hence was thinking that in
> > > order
> > > > > to leverage KIP-360 for it, we'd have to let the broker to
> > > > > return UNKNOWN_PRODUCER_ID. I.e. we'd change the logic of partition
> > > > leader
> > > > > as well to return UNKNOWN_PRODUCER_ID to let the producer try
> > > > > re-initializing the PID on the coordinator, and if it is indeed due
> > to
> > > > > fencing, then coordinator can let the client know of the fatal
> error
> > > and
> > > > > then fail. In that case, then we do need to bump up the producer
> API
> > > > > version so that the partition leader knows if it is from older or
> > newer
> > > > > clients: if it is older client who do not have KIP-360, we'd return
> > > 

Re: Rocksdb Statistics

2020-04-22 Thread Nagendra Korrapati
Thanks Bruno. 

On 2020/04/22 15:58:50, Bruno Cadonna  wrote: 
> Hi Nagendra,> 
> 
> What you describe is a known issue:> 
> https://issues.apache.org/jira/browse/KAFKA-9675> 
> 
> And you also described the appropriate fix:> 
> https://github.com/apache/kafka/pull/8256> 
> 
> The issue is fixed and will be included in 2.6.0, 2.4.2, 2.5.1.> 
> 
> I am sorry for any inconveniences.> 
> 
> Best,> 
> Bruno> 
> 
> On Wed, Apr 22, 2020 at 5:48 PM Nagendra Korrapati> 
>  wrote:> 
> >> 
> > I am using kafka-streams-2.4.0. My issue details :> 
> >> 
> >  metrics.recording.level is set to DEBUG> 
> >  Still all the stream-state-metrics values recorded are all 0. (Ex: 
> > bytes-written-total etc.,)> 
> >> 
> > Changed the code in RocksDBStore class openDB method. Moved the call to 
> > maybeSetupMetricsRecorder  before the openRocksDB call in openDB method. It 
> > started working and able to see the metrics values.> 
> >> 
> > Can anyone please comment on the above?> 
> >> 
> > thanks> 
> > Nagendra> 
> 

Re: Rocksdb Statistics

2020-04-22 Thread Bruno Cadonna
Hi Nagendra,

What you describe is a known issue:
https://issues.apache.org/jira/browse/KAFKA-9675

And you also described the appropriate fix:
https://github.com/apache/kafka/pull/8256

The issue is fixed and will be included in 2.6.0, 2.4.2, 2.5.1.

I am sorry for any inconveniences.

Best,
Bruno

On Wed, Apr 22, 2020 at 5:48 PM Nagendra Korrapati
 wrote:
>
> I am using kafka-streams-2.4.0. My issue details :
>
>  metrics.recording.level is set to DEBUG
>  Still all the stream-state-metrics values recorded are all 0. (Ex: 
> bytes-written-total etc.,)
>
> Changed the code in RocksDBStore class openDB method. Moved the call to 
> maybeSetupMetricsRecorder  before the openRocksDB call in openDB method. It 
> started working and able to see the metrics values.
>
> Can anyone please comment on the above?
>
> thanks
> Nagendra


Rocksdb Statistics

2020-04-22 Thread Nagendra Korrapati
I am using kafka-streams-2.4.0. My issue details :

 metrics.recording.level is set to DEBUG
 Still all the stream-state-metrics values recorded are all 0. (Ex: 
bytes-written-total etc.,)

Changed the code in RocksDBStore class openDB method. Moved the call to 
maybeSetupMetricsRecorder  before the openRocksDB call in openDB method. It 
started working and able to see the metrics values.

Can anyone please comment on the above?

thanks
Nagendra

[jira] [Created] (KAFKA-9904) Use ThreadLocalConcurrent to Replace Random

2020-04-22 Thread David Mollitor (Jira)
David Mollitor created KAFKA-9904:
-

 Summary: Use ThreadLocalConcurrent to Replace Random
 Key: KAFKA-9904
 URL: https://issues.apache.org/jira/browse/KAFKA-9904
 Project: Kafka
  Issue Type: Improvement
Reporter: David Mollitor


https://docs.oracle.com/javase/8/docs/api/java/util/concurrent/ThreadLocalRandom.html



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-04-22 Thread Gwen Shapira
Hey Boyang,

Sorry if this was already discussed, but I didn't see this as rejected
alternative:

Until now, we always did client side routing - the client itself found the
controller via metadata and directed requests accordingly. Brokers that
were not the controller, rejected those requests.

Why did we decide to move to broker side routing? Was the client-side
option discussed and rejected somewhere and I missed it?

Gwen

On Fri, Apr 3, 2020, 4:45 PM Boyang Chen  wrote:

> Hey all,
>
> I would like to start off the discussion for KIP-590, a follow-up
> initiative after KIP-500:
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-590%3A+Redirect+Zookeeper+Mutation+Protocols+to+The+Controller
>
> This KIP proposes to migrate existing Zookeeper mutation paths, including
> configuration, security and quota changes, to controller-only by always
> routing these alterations to the controller.
>
> Let me know your thoughts!
>
> Best,
> Boyang
>


Re: Retention policies question

2020-04-22 Thread Gwen Shapira
Kafka purges old messages from both leaders and replicas.

If there was a mistake in the book, can you tell me which chapter and page?
We'll fix it.

Gwen

On Wed, Apr 22, 2020, 7:51 AM Alex Bull  wrote:

> Hi, Dear Kafka Developers,
>
> I've read 'Kafka: The Definitive Guide' by Narkhede and others and I have a
> following question.
> On what side topic retention policies (delete or compact) are performed?
> I have a guess that they work only on  brokers that hold leader replica of
> partition.
> Or am I  wrong ?
>
> With best regards,
> Alex.
>


Retention policies question

2020-04-22 Thread Alex Bull
Hi, Dear Kafka Developers,

I've read 'Kafka: The Definitive Guide' by Narkhede and others and I have a
following question.
On what side topic retention policies (delete or compact) are performed?
I have a guess that they work only on  brokers that hold leader replica of
partition.
Or am I  wrong ?

With best regards,
Alex.


[jira] [Created] (KAFKA-9903) kafka ShutdownableThread judge thread isRuning status has some bug

2020-04-22 Thread shilin Lu (Jira)
shilin Lu created KAFKA-9903:


 Summary: kafka ShutdownableThread  judge thread isRuning status 
has some bug
 Key: KAFKA-9903
 URL: https://issues.apache.org/jira/browse/KAFKA-9903
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.3.1
Reporter: shilin Lu
 Attachments: image-2020-04-22-21-28-03-154.png

h2. 1.bug
{code:java}
override def run(): Unit = {
  isStarted = true
  info("Starting")
  try {
while (isRunning)
  doWork()
  } catch {
case e: FatalExitError =>
  shutdownInitiated.countDown()
  shutdownComplete.countDown()
  info("Stopped")
  Exit.exit(e.statusCode())
case e: Throwable =>
  if (isRunning)
error("Error due to", e)
  } finally {
shutdownInitiated.countDown()
shutdownComplete.countDown()
  }
  info("Stopped")
}

def isRunning: Boolean = {
  shutdownInitiated.getCount() != 0
}{code}
1.when replicaThread has exception which is not fatalExitError, the thread will 
exit,and run finally logic(countdown the shutdownComplete conutdownLatch),but 
shutdownInitiated is not be countdown.

2.with 1, shutdownInitiated is just not countdown, its value is 1, isRunning 
logic just judge thread isRuning through shutdownInitiated != 0, so through 
this method to judge thread status is wrong.

3.isRunning method is used in shutdownIdleFetcherThreads, processFetchRequest, 
controller request send and oher else, maybe cause thread can't be remove and 
something can not be done
h2. 2.bugfix

Just like the following code,countdown shutdownInitiated in finally logic

 
{code:java}
override def run(): Unit = {
  isStarted = true
  info("Starting")
  try {
while (isRunning)
  doWork()
  } catch {
case e: FatalExitError =>
  shutdownInitiated.countDown()
  shutdownComplete.countDown()
  info("Stopped")
  Exit.exit(e.statusCode())
case e: Throwable =>
  if (isRunning)
error("Error due to", e)
  } finally {
shutdownInitiated.countDown()
shutdownComplete.countDown()
  }
  info("Stopped")
}
{code}
 

 

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-04-22 Thread Rajini Sivaram
We do use custom principals that rely on information not contained in the
serialized principal and hence authorization based on serialized principals
can break existing production systems. Apart from the information contained
in the session principal, ACLs can also be based on host I, but I guess
that can be added easily to the envelope.

Several requests like AlterConfigs are compound requests containing
multiple resources, some of which may be authorized and some not. Given
that the controller cannot perform authorization of forwarded requests, the
forwarding broker needs to not only perform authorization, but also filter
out unauthorized resources from the request.

So assuming that the forwarding broker authorizes request from User:Alice
and audit logs the entry for User:Alice, what is Controller going to do
with the principal in the envelope? I guess we will just let the request go
through the normal request handlers in KafkaApis. Which means the request
will be authorized and audit logged again. The question is whether we would
use 1) principal User:Alice contained in the envelope or 2) the broker
context:
1) User:Alice may not have permissions since its permissions were based on
other fields in its authenticated context not forwarded to the Controller
2) User:Broker will need additional permissions if we use that since
brokers never needed AlterConfigs permissions before.

I think we should use 2). Which basically means that the principal in the
envelope is not particularly useful and we could just as well put that into
client-id (insecure and loggable, but not used in a security-critical
context).

Regards,

Rajini


On Wed, Apr 22, 2020 at 8:39 AM Tom Bentley  wrote:

> Hi Boyang and Sönke,
>
> Regarding custom Principals, I don't think too many people do this in
> > practice, but in theory you can provide you own PrincipalBuilder and use
> > your own Principal objects that contain as much additional information as
> > you wish. And since these can basically be any Java object that makes
> them
> > very tough to serialize.
> > Currently these Principals don't need to be serialized, because they are
> > created and used within the same JVM, there is no need to forward them
> to a
> > different broker.
> >
>
> This is exactly the point I was trying to make (clearly not so
> successfully).
>
> You can't (in general) serialize the principal itself. (There's nothing
> stopping a custom principal builder from returning an instance of some
> subclass of KafkaPrincipal with extra fields which are then used by some
> custom authorizer). And I don't think it's possible to serialize the
> AuthenticationContext (which would allow you to obtain a principal instance
> on the controller using the principal builder). For example,
> SslAuthenticationContext contains an SSLSession.
>
> I think Sönke is correct that this isn't likely to be functionality which
> is used frequently, but these contracts existed already, so we can't just
> decide that only KafkaPrincipal can be used.
>
> Kind regards,
>
> Tom
>
> On Tue, Apr 21, 2020 at 10:43 PM Sönke Liebau
>  wrote:
>
> > Hi Boyang,
> >
> > I think what Tom is referring to is that it is very hard to forward
> enough
> > information to the controller to put it into a position to properly
> > authenticate any request.
> >
> > While the Default KafkaPrincipal can easily be serialized and sent to the
> > controller, as previously seen, those are just strings. For the
> Controller
> > to properly authenticate a request we'd need to forward the
> > AuthenticationContext (from which the Principal is built [1]) containing
> > the SSL/SASL details to the controller, in order for the controller to
> then
> > check certificates for validity etc.
> > And those checks will be very difficult, because what we are effectively
> > doing here is a man-in-the-middle attack (broadly speaking), as we are
> > forwarding a request "in the name of" someone else. And most
> authentication
> > methods have been built to prevent exactly that.
> > As soon as we have only the Principal we are trusting someone else to
> have
> > properly authenticated that principal, because we do not have all the
> > information to do that verification ourselves. And if we do that, then I
> > don't see why we should a
> >
> > Regarding custom Principals, I don't think too many people do this in
> > practice, but in theory you can provide you own PrincipalBuilder and use
> > your own Principal objects that contain as much additional information as
> > you wish. And since these can basically be any Java object that makes
> them
> > very tough to serialize.
> > Currently these Principals don't need to be serialized, because they are
> > created and used within the same JVM, there is no need to forward them
> to a
> > different broker.
> > I wrote a blog post [2] about a scenario that uses a custom Principal a
> > little while ago, that shows a possible scenario, maybe that helps a
> > little.
> >
> > Feel free to 

[jira] [Created] (KAFKA-9902) java client api can not completely take out the kafka-consumer-groups.sh output of information

2020-04-22 Thread startjava (Jira)
startjava created KAFKA-9902:


 Summary: java client api can not completely take out the 
kafka-consumer-groups.sh output of information
 Key: KAFKA-9902
 URL: https://issues.apache.org/jira/browse/KAFKA-9902
 Project: Kafka
  Issue Type: Test
Reporter: startjava


Why the java client api is not with:
.kafka-consumer-groups.sh --bootstrap-server localhost:9081 --describe --group 
test
The method corresponding to the command, I can not get together GROUP, TOPIC, 
PARTITION, CURRENT-OFFSET, LOG-END-OFFSET, LAG, CONSUMER-ID, HOST, CLIENT-ID 
these columns of information, search materials know need to be taken 
separately, which makes our developers very troublesome, and this feature is 
very common.

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-590: Redirect Zookeeper Mutation Protocols to The Controller

2020-04-22 Thread Tom Bentley
Hi Boyang and Sönke,

Regarding custom Principals, I don't think too many people do this in
> practice, but in theory you can provide you own PrincipalBuilder and use
> your own Principal objects that contain as much additional information as
> you wish. And since these can basically be any Java object that makes them
> very tough to serialize.
> Currently these Principals don't need to be serialized, because they are
> created and used within the same JVM, there is no need to forward them to a
> different broker.
>

This is exactly the point I was trying to make (clearly not so
successfully).

You can't (in general) serialize the principal itself. (There's nothing
stopping a custom principal builder from returning an instance of some
subclass of KafkaPrincipal with extra fields which are then used by some
custom authorizer). And I don't think it's possible to serialize the
AuthenticationContext (which would allow you to obtain a principal instance
on the controller using the principal builder). For example,
SslAuthenticationContext contains an SSLSession.

I think Sönke is correct that this isn't likely to be functionality which
is used frequently, but these contracts existed already, so we can't just
decide that only KafkaPrincipal can be used.

Kind regards,

Tom

On Tue, Apr 21, 2020 at 10:43 PM Sönke Liebau
 wrote:

> Hi Boyang,
>
> I think what Tom is referring to is that it is very hard to forward enough
> information to the controller to put it into a position to properly
> authenticate any request.
>
> While the Default KafkaPrincipal can easily be serialized and sent to the
> controller, as previously seen, those are just strings. For the Controller
> to properly authenticate a request we'd need to forward the
> AuthenticationContext (from which the Principal is built [1]) containing
> the SSL/SASL details to the controller, in order for the controller to then
> check certificates for validity etc.
> And those checks will be very difficult, because what we are effectively
> doing here is a man-in-the-middle attack (broadly speaking), as we are
> forwarding a request "in the name of" someone else. And most authentication
> methods have been built to prevent exactly that.
> As soon as we have only the Principal we are trusting someone else to have
> properly authenticated that principal, because we do not have all the
> information to do that verification ourselves. And if we do that, then I
> don't see why we should a
>
> Regarding custom Principals, I don't think too many people do this in
> practice, but in theory you can provide you own PrincipalBuilder and use
> your own Principal objects that contain as much additional information as
> you wish. And since these can basically be any Java object that makes them
> very tough to serialize.
> Currently these Principals don't need to be serialized, because they are
> created and used within the same JVM, there is no need to forward them to a
> different broker.
> I wrote a blog post [2] about a scenario that uses a custom Principal a
> little while ago, that shows a possible scenario, maybe that helps a
> little.
>
> Feel free to correct me if I misinterpreted your meaning Tom :)
>
> Best regards,
> Sönke
>
> [1] https://imgur.com/a/Gi0cFNH
> [2]
> https://www.opencore.com/de/blog/2018/3/group-based-authorization-in-kafka/
>
> On Tue, 21 Apr 2020 at 20:33, Boyang Chen 
> wrote:
>
> > Hey Tom,
> >
> > I agree with the claim here. All the brokers should have the same
> > authentication power, which means getting the forwarding broker verify
> the
> > client request first is more favorable. This approach avoids sending one
> > unnecessary forwarding request if it couldn't pass the authorization in
> the
> > first place.
> >
> > In the meantime, could you give more context on the custom Kafka
> principal
> > you are referring to? How does that get encoded today, and how server and
> > client could both agree on the serialization? As the plain principal is
> > only a String, I would like to know more about the security strategy
> people
> > are using, thanks!
> >
> > Boyang
> >
> > On Tue, Apr 21, 2020 at 2:24 AM Tom Bentley  wrote:
> >
> > > Hi Boyang,
> > >
> > > The answer to my original question about the request principal was that
> > the
> > > forwarding broker would authorize the request and the controller would
> > > trust the request since it was from another broker. AFAIU you added the
> > > principal purely for logging purposes. In the "EnvelopeRequest
> Handling"
> > > section the KIP now says "Once that part is done, we shall replace the
> > > request context with Principal information embedded inside the
> > > EnvelopeRequest to complete the inner request permission check.", which
> > > sounds to me like the controller is now authorizing the request (maybe
> in
> > > addition to the forwarding broker) using a principal it's deserialized
> > from
> > > the EnvelopeRequest. I don't think that works if a custom principal
> > builder
> > > is