Build failed in Jenkins: Kafka » kafka-trunk-jdk11 #47

2020-09-04 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10133: MM2 readme update on config (#9215)

[github] KAFKA-10259: KIP-554 Broker-side SCRAM Config API (#9032)


--
[...truncated 6.57 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessRecordForTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldForwardRecordsFromSubtopologyToSubtopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testToString STARTED

org.apache.kafka.streams.test.TestRecordTest > testToString PASSED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords STARTED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords PASSED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
STARTED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher STARTED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher PASSED

org.apache.kafka.streams.test.TestRecordTest > testFields STARTED

org.apache.kafka.streams.test.TestRecordTest > testFields PASSED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode STARTED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithInMemoryStore STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithInMemoryStore PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 

Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-09-04 Thread Unmesh Joshi
Hi Colin,
Thanks for the response.
>>If the start time is not specified in the request, then the network time
is excluded from the heartbeat time.
The most common implementation pattern I see (looking at Google Chubby
sessions, Zookeeper sessions, etcd lease implementation, LogCabin session
and Docker swarm session) is that only durations are sent in requests and
responses, and not the actual time values. The actual time values for the
lease expiration as well as to send the next heartbeat are derived. e.g.
Zookeeper calculates the heartbeat time on the client as (2/3 / 2 ~ 1/3) of
the session expiration interval that it gets in the session response.
This makes sure that even if actual time values on different servers have
some differences, it won't impact the lease expiration mechanism.

>>>It's important to keep in mind that messages may be delayed in the
network, or arrive out of order.  When this happens, we will use the start
>>time specified in the request to determine if the request is stale.
I am assuming that there will be a single TCP connection maintained between
broker and active controller. So, there won't be any out of order requests?
There will be a scenario of broker GC pause, which might cause connection
timeout and broker might need to reestablish the connection. If the pause
is too long, lease will expire and the heartbeat sent after the pause will
be treated as a new registration (similar to restart case), and a new epoch
number will be assigned to the broker.

>>>so the need to re-establish them after a controller failover doesn't
seem like a big problem
When the active controller fails, the new active controller needs to be
sure that it considers all the known brokers as alive till the lease
expiration interval.  Because registration.lease.timeout.ms, is configured
on the controller, the new active controller will extend all the leases by
registration.lease.timeout.ms. I see that it won't need last heartbeat
time.

Thanks,
Unmesh

On Sat, Sep 5, 2020 at 1:28 AM Colin McCabe  wrote:

> > Colin wrote:
> > > The reason for including LeaseStartTimeMs in the request is to ensure
> > > that the time required to communicate with the controller gets
> included in
> > > the lease time.  Since requests can potentially be delayed in the
> network
> > > for a long time, this is important.
>
> On Mon, Aug 31, 2020, at 05:58, Unmesh Joshi wrote:
> > The network time will be added anyway, because the lease timer on the
> > active controller will start only after the heartbeat request reaches the
> > server.
>
> Hi Unmesh,
>
> If the start time is not specified in the request, then the network time
> is excluded from the heartbeat time.
>
> Here's an example:
> Let's say broker A sends a heartbeat at time 100, and it arrives on the
> controller at time 200, and the lease duration is 1000.
>
> The controller looks at the start time in the request, which is 100, and
> adds 1000 to it, getting 1100.  On the other hand, if start time is not
> specified in the request, then the expiration will be at time 1200.
> That is what I mean by "the network time is included in the expiration
> time."
>
> > And I think, some assumption about network round trip time is
> > needed anyway to decide on the frequency of the heartbeat (
> > registration.heartbeat.interval.ms), and lease timeout (
> > registration.lease.timeout.ms). So I think just having a leaseTTL in the
> > request is easier to understand and implement.
>
> It's important to keep in mind that messages may be delayed in the
> network, or arrive out of order.  When this happens, we will use the start
> time specified in the request to determine if the request is stale.
>
> > > Yes, I agree that the lease timeout on the controller side should be
> > > reset in the case of controller failover.  The alternative would be to
> > > track the lease as hard state rather than soft state, but I think that
> > > is not really needed, and would result in more log entries.
> > My interpretation of the mention of BrokerRecord in the KIP was that this
> > record exists in the Raft log.
>
> BrokerRecord does exist in the Raft log, but does not include the last
> heartbeat time.
>
> > By soft state, do you mean the broker
> > records exist only on the active leader and will not be replicated in the
> > raft log? If the live brokers list is maintained only on the active
> > controller (raft leader), then, in case of leader failure, there will be
> a
> > window where the new leader does not know about the live brokers, till
> the
> > brokers establish the leases again.
> > I think it will be safer to have leases as a hard state managed by
> standard
> > Raft replication.
>
> Leases are short, so the need to re-establish them after a controller
> failover doesn't seem like a big problem.  But this is something we can
> tweak if it becomes an issue.  One option would be to have a separate log
> which is only used by the controller nodes for this (since, after all,
> brokers 

Build failed in Jenkins: Kafka » kafka-trunk-jdk8 #46

2020-09-04 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10259: KIP-554 Broker-side SCRAM Config API (#9032)


--
[...truncated 6.52 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnAllStoresNames[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPassRecordHeadersIntoSerializersAndDeserializers[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessConsumerRecordList[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSinkSpecificSerializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFlushStoreForFirstInput[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourceThatMatchPattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureSinkTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUpdateStoreForNewKey[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldSendRecordViaCorrectSourceTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldSetRecordMetadata[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForLargerValue[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectInMemoryStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldThrowForMissingTime[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureInternalTopicNamesIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnWallClockTimeDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 

Jenkins build is back to normal : Kafka » kafka-trunk-jdk15 #47

2020-09-04 Thread Apache Jenkins Server
See 




Jenkins build is back to normal : Kafka » kafka-trunk-jdk8 #45

2020-09-04 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-04 Thread John Roesler
Hi Sophie,

Uh, oh, it's never a good sign when the discussion moves
into the vote thread :)

I agree with you, it seems like a good touch for
removeStreamThread() to return the name of the thread that
got removed, rather than a boolean flag. Maybe the return
value would be `null` if there is no thread to remove.

If we go that way, I'd suggest that addStreamThread() also
return the name of the newly created thread, or null if no
thread can be created right now.

I'm not completely sure if I think that callers of this
method would know exactly how many threads there are. Sure,
if a human being is sitting there looking at the metrics or
logs and decides to call the method, it would work out, but
I'd expect this kind of method to find its way into
automated tooling that reacts to things like current system
load or resource saturation. Those kinds of toolchains often
are part of a distributed system, and it's probably not that
easy to guarantee that the thread count they observe is
fully consistent with the number of threads that are
actually running. Therefore, an in-situ `int
numStreamThreads()` method might not be a bad idea. Then
again, it seems sort of optional. A caller can catch an
exception or react to a `null` return value just the same
either way. Having both add/remove methods behave similarly
is probably more valuable.

Thanks,
-John


On Thu, 2020-09-03 at 12:15 -0700, Sophie Blee-Goldman
wrote:
> Hey, sorry for the late reply, I just have one minor suggestion. Since we
> don't
> make any guarantees about which thread gets removed or allow the user to
> specify, I think we should return either the index or full name of the
> thread
> that does get removed by removeThread().
> 
> I know you just updated the KIP to return true/false if there are/aren't any
> threads to be removed, but I think this would be more appropriate as an
> exception than as a return type. I think it's reasonable to expect users to
> have some sense to how many threads are remaining, and not try to remove
> a thread when there is none left. To me, that indicates something wrong
> with the user application code and should be treated as an exceptional case.
> I don't think the same code clarify argument applies here as to the
> addStreamThread() case, as there's no reason for an application to be
> looping and retrying removeStreamThread()  since if that fails, it's because
> there are no threads left and thus it will continue to always fail. And if
> the
> user actually wants to shut down all threads, they should just close the
> whole application rather than call removeStreamThread() in a loop.
> 
> While I generally think it should be straightforward for users to track how
> many stream threads they have running, maybe it would be nice to add
> a small utility method that does this for them. Something like
> 
> // Returns the number of currently alive threads
> boolean runningStreamThreads();
> 
> On Thu, Sep 3, 2020 at 7:41 AM Matthias J. Sax  wrote:
> 
> > +1 (binding)
> > 
> > On 9/3/20 6:16 AM, Bruno Cadonna wrote:
> > > Hi,
> > > 
> > > I would like to start the voting on KIP-663 that proposes to add methods
> > > to the Kafka Streams client to add and remove stream threads during
> > > execution.
> > > 
> > > 
> > https://cwiki.apache.org/confluence/display/KAFKA/KIP-663%3A+API+to+Start+and+Shut+Down+Stream+Threads
> > > 
> > > Best,
> > > Bruno



Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-04 Thread John Roesler
Hi all,

This conversation sounds good to me so far.

Sophie raised a concern before that changing the state
machine would break state restore listeners. This is true,
and we actually have not changed the main state machine in a
long time. The last change I remember was that we used to go
"CREATED -> RUNNING -> REBALANCING -> RUNNING", and now we
just go "CREATED -> REBALANCING -> RUNNING". This is
actually the reason why many state listeners check for
"REBALANCING -> RUNNING", to filter out the old "phantom
running" transition from "CREATED -> RUNNING".

Anyway, the observation is that dropping the "phantom
running" state didn't break any real use case we were aware
of. But adding RESTORING between REBALACING and RUNNING
certainly would break the common pattern that we're aware
of. This would indeed be the first time we introduce a
practically breaking change to the state machine at least
since 2.0, and maybe since 1.0 too. We should probably
consider the impact.

One alternative is to consider the state machine and the
state listener to be coupled APIs. We can deprecate and
replace the current state listener, and also introduce a new
state machine enum with our desired new state and
transitions, while leaving the existing one alone and
deprecating it. Then, no existing code would break, only get
deprecation warnings.



Matthias gave me an idea a few messages back with his note
about setting/checking "flags". What if we flip it around,
and set the flags on the global stores themselves. Then, we
throw an exception when a processor queries the store while
it's restoring. When they get that exception, they just put
that task on the back burner for a while and try again later
(similar to Matthias's timeout handling KIP). The global
thread sets the flag on a particular store when it realizes
it needs to be re-created and unsets it when the restore
completes.

Then:
1. Only the global stores that actually need to be restored
block anything
2. Only the tasks that access the stores get blocked
3. No new states need to be introduced

WDYT?
-John

On Fri, 2020-09-04 at 13:18 +, Navinder Brar wrote:
> Hi Sophie,
> 
> Thanks for the detailed explanation. I agree from a user standpoint, I don't 
> think there is any use-case to take any separate action in case of 
> GLOBAL_RESTORING and RESTORING phase. 
> 
> So, internally in the code we can handle the cases as Matthiasexplained above 
> and we can discuss those in the PR. I will update the KIP based on what all 
> we have converged towards including having an uber RESTORING(rather than 
> GLOBAL_RESTORING)state which takes stream and global threads into 
> consideration.
> 
> I will update the KIP soon and share it again as a lot has changed from where 
> we started this KIP from.
> 
> Regards,Navinder
> 
> On Friday, 4 September, 2020, 04:19:20 am IST, Sophie Blee-Goldman 
>  wrote:  
>  
>  Thanks Matthias, that sounds like what I was thinking. I think we should
> always be
> able to figure out what to do in various scenarios as outlined in the
> previous email.
> 
> >   For the same reason, I wouldn't want to combine global restoring and
> normal restoring
> > because then it would make all the restorings independent but we don't
> want that. We
> > want global stores to be available before any processing starts on the
> active tasks.
> 
> I'm not sure I follow this specific point, but I don't think I did a good
> job of explaining my
> proposal so it's probably my own fault. When I say that we should merge
> RESTORING
> and GLOBAL_RESTORING, I just mean that we should provide a single
> user-facing
> state to encompass any ongoing restoration. The point of the KafkaStreams
> RESTORING
> state is to alert users that their state may be unavailable for IQ, and
> active tasks may be
> idle. This is true for both global and non-global restoration. I think the
> ultimate question
> is whether as a user, I would react any differently to a GLOBAL_RESTORING
> state vs
> the regular RESTORING. My take is "no", in which case we should just
> provide a single
> unified state for the minimal public API. But if anyone can think of a
> reason for the user
> to need to distinguish between different types of restoration, that would
> be a good
> argument to keep them separate.
> 
> Internally, we do need to keep track of a "global restore" flag to
> determine the course
> of action -- for example if a StreamThread transitions to RUNNING but sees
> that the
> KafkaStreams state is RESTORING, should it start processing or not? The
> answer
> depends on whether the state is RESTORING due to any global stores. But the
> KafkaStreams state is a public interface, not an internal bookkeeper, so we
> shouldn't
> try to push our internal logic into the user-facing API.
> 
> 
> On Thu, Sep 3, 2020 at 7:36 AM Matthias J. Sax  wrote:
> 
> > I think this issue can actually be resolved.
> > 
> >   - We need a flag on the stream-threads if global-restore is in
> > progress; for this 

[jira] [Resolved] (KAFKA-10259) KIP-554: Add Broker-side SCRAM Config API

2020-09-04 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10259?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe resolved KAFKA-10259.
--
Fix Version/s: 2.7.0
 Reviewer: Colin McCabe
   Resolution: Fixed

> KIP-554: Add Broker-side SCRAM Config API
> -
>
> Key: KAFKA-10259
> URL: https://issues.apache.org/jira/browse/KAFKA-10259
> Project: Kafka
>  Issue Type: New Feature
>Reporter: Ron Dagostino
>Assignee: Ron Dagostino
>Priority: Major
> Fix For: 2.7.0
>
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-09-04 Thread Colin McCabe
> Colin wrote:
> > The reason for including LeaseStartTimeMs in the request is to ensure
> > that the time required to communicate with the controller gets included in
> > the lease time.  Since requests can potentially be delayed in the network
> > for a long time, this is important.

On Mon, Aug 31, 2020, at 05:58, Unmesh Joshi wrote:
> The network time will be added anyway, because the lease timer on the
> active controller will start only after the heartbeat request reaches the
> server.

Hi Unmesh,

If the start time is not specified in the request, then the network time is 
excluded from the heartbeat time.

Here's an example:
Let's say broker A sends a heartbeat at time 100, and it arrives on the 
controller at time 200, and the lease duration is 1000.

The controller looks at the start time in the request, which is 100, and adds 
1000 to it, getting 1100.  On the other hand, if start time is not specified in 
the request, then the expiration will be at time 1200.
That is what I mean by "the network time is included in the expiration time."

> And I think, some assumption about network round trip time is
> needed anyway to decide on the frequency of the heartbeat (
> registration.heartbeat.interval.ms), and lease timeout (
> registration.lease.timeout.ms). So I think just having a leaseTTL in the
> request is easier to understand and implement.

It's important to keep in mind that messages may be delayed in the network, or 
arrive out of order.  When this happens, we will use the start time specified 
in the request to determine if the request is stale.

> > Yes, I agree that the lease timeout on the controller side should be
> > reset in the case of controller failover.  The alternative would be to
> > track the lease as hard state rather than soft state, but I think that
> > is not really needed, and would result in more log entries.
> My interpretation of the mention of BrokerRecord in the KIP was that this
> record exists in the Raft log.

BrokerRecord does exist in the Raft log, but does not include the last 
heartbeat time.

> By soft state, do you mean the broker
> records exist only on the active leader and will not be replicated in the
> raft log? If the live brokers list is maintained only on the active
> controller (raft leader), then, in case of leader failure, there will be a
> window where the new leader does not know about the live brokers, till the
> brokers establish the leases again.
> I think it will be safer to have leases as a hard state managed by standard
> Raft replication.

Leases are short, so the need to re-establish them after a controller failover 
doesn't seem like a big problem.  But this is something we can tweak if it 
becomes an issue.  One option would be to have a separate log which is only 
used by the controller nodes for this (since, after all, brokers don't care 
about registration renewals).

> Or am I misunderstanding something? (I assume that with soft state, you
> mean something like zookeeper local sessions
> https://issues.apache.org/jira/browse/ZOOKEEPER-1147.)
> 
> > Our code is single threaded as well.  I think it makes sense for the
> > controller, since otherwise locking becomes very messy.  I'm not sure I
> > understand your question about duplicate broker ID detection, though.
> There's a section in the KIP about this -- is there a detail we should add
> there?

This is an implementation detail that doesn't need to be in the KIP.

best,
Colin

> I assumed broker leases are implemented as a hard state. In that case, to
> check for broker id conflict, we need to check the broker ids at two places
> 1. Pending broker registrations (which are yet to be committed) 2. Already
> committed broker registrations.
> 
> Thanks,
> Unmesh
> 
> 
> 
> On Mon, Aug 31, 2020 at 5:42 PM Colin McCabe  wrote:
> 
> > On Sat, Aug 29, 2020, at 01:12, Unmesh Joshi wrote:
> > > >>>Can you repeat your questions about broker leases?
> > >
> > > The LeaseStartTimeMs is expected to be the broker's
> > > 'System.currentTimeMillis()' at the point of the request. The active
> > > controller will add its lease period to this in order to compute the
> > > LeaseEndTimeMs.
> > >
> > > I think the use of LeaseStartTimeMs and LeaseEndTimeMs in the KIP is a
> > > bit
> > > confusing.  Monotonic Clock (System.nanoTime) on the active controller
> > > should be used to track leases.
> > > (For example,
> > >
> > https://issues.apache.org/jira/browse/ZOOKEEPER-1616https://github.com/etcd-io/etcd/pull/6888/commits/e7f4010ccaf28b6ce64fe514d25a4b2fa459d114
> > > )
> > >
> > > Then we will not need LeaseStartTimeMs?
> > > Instead of LeaseStartTimeMs, can we call it LeaseTTL? The active
> > controller
> > > can then calculate LeaseEndTime = System.nanoTime() + LeaseTTL.
> > > In this case we might just drop LeaseEndTimeMs from the response, as the
> > > broker already knows about the TTL and can send heartbeats at some
> > fraction
> > > of TTL, say every TTL/4 milliseconds.(elapsed time 

Build failed in Jenkins: Kafka » kafka-trunk-jdk15 #46

2020-09-04 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10133: MM2 readme update on config (#9215)


--
[...truncated 3.28 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordsFromKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKeyAndDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithDefaultTimestamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecordWithOtherTopicNameAndTimestampWithTimetamp 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullHeaders PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithDefaultTimestamp PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithNullKey PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateNullKeyConsumerRecord PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldCreateConsumerRecordWithOtherTopicName PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > shouldAdvanceTime 
PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldNotAllowToCreateTopicWithNullTopicNameWithKeyValuePairs PASSED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps
 STARTED

org.apache.kafka.streams.test.ConsumerRecordFactoryTest > 
shouldRequireCustomTopicNameIfNotDefaultFactoryTopicNameWithKeyValuePairsAndCustomTimestamps

Build failed in Jenkins: Kafka » kafka-trunk-jdk8 #44

2020-09-04 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10314: KafkaStorageException on reassignment when offline log 
directories exist (#9122)


--
[...truncated 3.25 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = true] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotUpdateStoreForSmallerValue[Eos enabled = true] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = true] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCreateStateDirectoryForStatefulTopology[Eos enabled = true] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = true] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfWallClockTimeAdvances[Eos enabled = true] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldCloseProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldCloseProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFeedStoreFromGlobalKTable[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldFeedStoreFromGlobalKTable[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCleanUpPersistentStateStoresOnClose[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCleanUpPersistentStateStoresOnClose[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopicDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopicDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRouting[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRouting[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureGlobalTopicNameIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureGlobalTopicNameIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED


Build failed in Jenkins: Kafka » kafka-trunk-jdk15 #45

2020-09-04 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10314: KafkaStorageException on reassignment when offline log 
directories exist (#9122)


--
[...truncated 3.28 MB...]
org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestampWithProducerRecord PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordWithExpectedRecordForCompareValueTimestamp 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullExpectedRecordForCompareValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldNotAllowNullProducerRecordForCompareKeyValue PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueAndTimestampIsEqualForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullReversForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfKeyAndValueIsEqualWithNullForCompareKeyValueWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfKeyIsDifferentWithNullForCompareKeyValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldPassIfValueAndTimestampIsEqualForCompareValueTimestampWithProducerRecord 
PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReversForCompareKeyValueTimestamp PASSED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 STARTED

org.apache.kafka.streams.test.OutputVerifierTest > 
shouldFailIfValueIsDifferentWithNullReverseForCompareValueTimestampWithProducerRecord
 PASSED

org.apache.kafka.streams.MockProcessorContextTest > 
shouldStoreAndReturnStateStores STARTED

org.apache.kafka.streams.MockProcessorContextTest > 
shouldStoreAndReturnStateStores PASSED

org.apache.kafka.streams.MockProcessorContextTest > 
shouldCaptureOutputRecordsUsingTo STARTED

org.apache.kafka.streams.MockProcessorContextTest > 
shouldCaptureOutputRecordsUsingTo PASSED

org.apache.kafka.streams.MockProcessorContextTest > shouldCaptureOutputRecords 
STARTED

org.apache.kafka.streams.MockProcessorContextTest > shouldCaptureOutputRecords 
PASSED

org.apache.kafka.streams.MockProcessorContextTest > 
fullConstructorShouldSetAllExpectedAttributes STARTED


Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-09-04 Thread Satish Duggana
Hi Jun,
Thanks for your thorough review and comments. Please find the inline
replies below.

600. The topic deletion logic needs more details.
600.1 The KIP mentions "The controller considers the topic partition is
deleted only when it determines that there are no log segments for that
topic partition by using RLMM". How is this done?

It uses RLMM#listSegments() returns all the segments for the given
topic partition.

600.2 "If the delete option is enabled then the leader will stop RLM task
and stop processing and it sets all the remote log segment metadata of that
partition with a delete marker and publishes them to RLMM." We discussed
this earlier. When a topic is being deleted, there may not be a leader for
the deleted partition.

This is a good point. As suggested in the meeting, we will add a
separate section for topic/partition deletion lifecycle and this
scenario will be addressed.

601. Unclean leader election
601.1 Scenario 1: new empty follower
After step 1, the follower restores up to offset 3. So why does it have
LE-2 at offset 5?

Nice catch. It was showing the leader epoch fetched from the remote
storage. It should be shown with the truncated till offset 3. Updated
the KIP.

601.2 senario 5: After Step 3, leader A has inconsistent data between its
local and the tiered data. For example. offset 3 has msg 3 LE-0 locally,
but msg 5 LE-1 in the remote store. While it's ok for the unclean leader to
lose data, it should still return consistent data, whether it's from the
local or the remote store.

There is no inconsistency here as LE-0 offsets are [0, 4] and LE-2:
[5, ]. It will always get the right records for the given offset and
leader epoch. In case of remote, RSM is invoked to get the remote log
segment that contains the given offset with the leader epoch.

601.4 It seems that retention is based on
listRemoteLogSegments(TopicPartition topicPartition, long leaderEpoch).
When there is an unclean leader election, it's possible for the new leader
to not to include certain epochs in its epoch cache. How are remote
segments associated with those epochs being cleaned?

That is a good point. This leader will also cleanup the epochs earlier
to its start leader epoch and delete those segments. It gets the
earliest epoch for a partition and starts deleting segments from that
leader epoch. We need one more API in RLMM to get the earliest leader
epoch.

601.5 The KIP discusses the handling of unclean leader elections for user
topics. What about unclean leader elections on
__remote_log_segment_metadata?
This is the same as other system topics like consumer_offsets,
__transaction_state topics. As discussed in the meeting, we will add
the behavior of __remote_log_segment_metadata topic’s unclean leader
truncation.

602. It would be useful to clarify the limitations in the initial release.
The KIP mentions not supporting compacted topics. What about JBOD and
changing the configuration of a topic from delete to compact
after remote.log.storage.enable is enabled?

This was updated in the KIP earlier.

603. RLM leader tasks:
603.1"It checks for rolled over LogSegments (which have the last message
offset less than last stable offset of that topic partition) and copies
them along with their offset/time/transaction indexes and leader epoch
cache to the remote tier." It needs to copy the producer snapshot too.

Right. It copies producer snapshots too as mentioned in LogSegmentData.

603.2 "Local logs are not cleaned up till those segments are copied
successfully to remote even though their retention time/size is reached"
This seems weird. If the tiering stops because the remote store is not
available, we don't want the local data to grow forever.

It was clarified in the discussion that the comment was more about the
local storage goes beyond the log.retention. The above statement is
about local.log.retention but not for the complete  log.retention.
When it reaches the log.retention then it will delete the local logs
even though those are not copied to remote storage.


604. "RLM maintains a bounded cache(possibly LRU) of the index files of
remote log segments to avoid multiple index fetches from the remote
storage. These indexes can be used in the same way as local segment indexes
are used." Could you provide more details on this? Are the indexes cached
in memory or on disk? If on disk, where are they stored? Are the cached
indexes bound by a certain size?

These are cached on disk and stored in log.dir with a name
“__remote_log_index_cache”. They are bound by the total size. This
will be exposed as a user configuration,

605. BuildingRemoteLogAux
605.1 In this section, two options are listed. Which one is chosen?
Option-2, updated the KIP.

605.2 In option 2, it says  "Build the local leader epoch cache by cutting
the leader epoch sequence received from remote storage to [LSO, ELO]. (LSO
= log start offset)." We need to do the same thing for the producer
snapshot. However, it's hard to cut the producer snapshot 

Re: Apply for JIRA contributor authority

2020-09-04 Thread Mickael Maison
Done. Thanks for your interest in Apache Kafka!

On Fri, Sep 4, 2020 at 4:54 PM 季伟  wrote:
>
> Hi,
> I want to contribute to Apache Kafka,
> Would you please offer me the contributor permission?
> My JIRA ID is jiweiautohome.
>
> --
>


[jira] [Resolved] (KAFKA-10314) KafkaStorageException on reassignment when offline log directories exist

2020-09-04 Thread Mickael Maison (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10314?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Mickael Maison resolved KAFKA-10314.

Fix Version/s: 2.7.0
   Resolution: Fixed

> KafkaStorageException on reassignment when offline log directories exist
> 
>
> Key: KAFKA-10314
> URL: https://issues.apache.org/jira/browse/KAFKA-10314
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.5.0
>Reporter: Noa Resare
>Assignee: Noa Resare
>Priority: Minor
> Fix For: 2.7.0
>
>
> If a reassignment of a partition is triggered to a broker with an offline 
> directory, the new broker will fail to follow, instead raising a 
> KafkaStorageException which causes the reassignment to stall indefinitely. 
> The error message we see is the following:
> {{[2020-07-23 13:11:08,727] ERROR [Broker id=1] Skipped the become-follower 
> state change with correlation id 14 from controller 1 epoch 1 for partition 
> t2-0 (last update controller epoch 1) with leader 2 since the replica for the 
> partition is offline due to disk error 
> org.apache.kafka.common.errors.KafkaStorageException: Can not create log for 
> t2-0 because log directories /tmp/kafka/d1 are offline (state.change.logger)}}
> It seems to me that unless the partition in question already existed on the 
> offline log partition, a better behaviour would simply be to assign the 
> partition to one of the available log directories.
> The conditional in 
> [LogManager.scala:769|https://github.com/apache/kafka/blob/11f75691b87fcecc8b29bfd25c7067e054e408ea/core/src/main/scala/kafka/log/LogManager.scala#L769]
>  was introduced to prevent the issue in 
> [KAFKA-4763|https://issues.apache.org/jira/browse/KAFKA-4763] where 
> partitions in offline logdirs would be re-created in an online directory as 
> soon as a LeaderAndISR message gets processed. However, the semantics of 
> isNew seems different in LogManager (the replica is new on this broker) 
> compared to when isNew is set in 
> [KafkaController.scala|https://github.com/apache/kafka/blob/11f75691b87fcecc8b29bfd25c7067e054e408ea/core/src/main/scala/kafka/controller/KafkaController.scala#L879]
>  (where it seems to refer to whether the topic partition in itself is new, 
> all followers gets {{isNew=false}})



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Apply for JIRA contributor authority

2020-09-04 Thread 季伟
Hi,
I want to contribute to Apache Kafka,
Would you please offer me the contributor permission?
My JIRA ID is jiweiautohome.

--



Re: [DISCUSS] KIP-406: GlobalStreamThread should honor custom reset policy

2020-09-04 Thread Navinder Brar
Hi Sophie,

Thanks for the detailed explanation. I agree from a user standpoint, I don't 
think there is any use-case to take any separate action in case of 
GLOBAL_RESTORING and RESTORING phase. 

So, internally in the code we can handle the cases as Matthiasexplained above 
and we can discuss those in the PR. I will update the KIP based on what all we 
have converged towards including having an uber RESTORING(rather than 
GLOBAL_RESTORING)state which takes stream and global threads into consideration.

I will update the KIP soon and share it again as a lot has changed from where 
we started this KIP from.

Regards,Navinder

On Friday, 4 September, 2020, 04:19:20 am IST, Sophie Blee-Goldman 
 wrote:  
 
 Thanks Matthias, that sounds like what I was thinking. I think we should
always be
able to figure out what to do in various scenarios as outlined in the
previous email.

>  For the same reason, I wouldn't want to combine global restoring and
normal restoring
> because then it would make all the restorings independent but we don't
want that. We
> want global stores to be available before any processing starts on the
active tasks.

I'm not sure I follow this specific point, but I don't think I did a good
job of explaining my
proposal so it's probably my own fault. When I say that we should merge
RESTORING
and GLOBAL_RESTORING, I just mean that we should provide a single
user-facing
state to encompass any ongoing restoration. The point of the KafkaStreams
RESTORING
state is to alert users that their state may be unavailable for IQ, and
active tasks may be
idle. This is true for both global and non-global restoration. I think the
ultimate question
is whether as a user, I would react any differently to a GLOBAL_RESTORING
state vs
the regular RESTORING. My take is "no", in which case we should just
provide a single
unified state for the minimal public API. But if anyone can think of a
reason for the user
to need to distinguish between different types of restoration, that would
be a good
argument to keep them separate.

Internally, we do need to keep track of a "global restore" flag to
determine the course
of action -- for example if a StreamThread transitions to RUNNING but sees
that the
KafkaStreams state is RESTORING, should it start processing or not? The
answer
depends on whether the state is RESTORING due to any global stores. But the
KafkaStreams state is a public interface, not an internal bookkeeper, so we
shouldn't
try to push our internal logic into the user-facing API.


On Thu, Sep 3, 2020 at 7:36 AM Matthias J. Sax  wrote:

> I think this issue can actually be resolved.
>
>  - We need a flag on the stream-threads if global-restore is in
> progress; for this case, the stream-thread may go into RUNNING state,
> but it's not allowed to actually process data -- it will be allowed to
> update standby-task though.
>
>  - If a stream-thread restores, its own state is RESTORING and it does
> not need to care about the "global restore flag".
>
>  - The global-thread just does was we discussed, including using state
> RESTORING.
>
>  - The KafkaStreams client state is in RESTORING, if at least one thread
> (stream-thread or global-thread) is in state RESTORING.
>
>  - On startup, if there is a global-thread, the just set the
> global-restore flag upfront before we start the stream-threads (we can
> actually still do the rebalance and potential restore in stream-thread
> in parallel to global restore) and rely on the global-thread to unset
> the flag.
>
>  - The tricky thing is, to "stop" processing in stream-threads if we
> need to wipe the global-store and rebuilt it. For this, we should set
> the "global restore flag" on the stream-threads, but we also need to
> "lock down" the global store in question and throw an exception if the
> stream-thread tries to access it; if the stream-thread get this
> exception, it need to cleanup itself, and wait until the "global restore
> flag" is unset before it can continue.
>
>
> Do we think this would work? -- Of course, the devil is in the details
> but it seems to become a PR discussion, and there is no reason to make
> it part of the KIP.
>
>
> -Matthias
>
> On 9/3/20 3:41 AM, Navinder Brar wrote:
> > Hi,
> >
> > Thanks, John, Matthias and Sophie for great feedback.
> >
> > On the point raised by Sophie that maybe we should allow normal
> restoring during GLOBAL_RESTORING, I think it makes sense but the challenge
> would be what happens when normal restoring(on actives) has finished but
> GLOBAL_RESTORINGis still going on. Currently, all restorings are
> independent of each other i.e. restoring happening on one task/thread
> doesn't affect another. But if we do go ahead with allowing normal
> restoring during GLOBAL_RESTORING then we willstill have to pause the
> active tasks from going to RUNNING if GLOBAL_RESTORING has not finished and
> normal restorings have finished. For the same reason, I wouldn't want to
> combine global restoring and normal restoring 

[DISCUSS] KIP idea: Support of multipart messages

2020-09-04 Thread Alexander Sibiryakov
Hello,

I would like to get your opinions on this KIP idea.

In short it will allow to transfer messages of bigger size than allowed by
the broker.

https://docs.google.com/document/d/1cKBNxPkVVENly9YczYXsVDVWwrCdRq3G_cja5s2QDQg/edit?usp=sharing

If all that makes sense, I'll create a full fledged KIP document and expand
the idea.

Thanks,
A.