Re: [DISCUSS] KIP-508: Make Suppression State Queriable - rebooted.

2020-09-16 Thread John Roesler
Hi Dongjin,

Thanks for the reply. Yes, that’s correct, we added that method to name the 
operation. But the operation seems synonymous with the view produced the 
operation, right?

During KIP-307, I remember thinking that it’s unfortunate the we had to have 
two different “name” concepts for the same thing just because setting the name 
on Materialized is equivalent both to making it queriable and actually 
materializing it.

If we were to reconsider the API, it would be nice to treat these three as 
orthogonal:
* specify a name
* flag to make the view queriable
* flag to materialize the view 

That was the context behind my suggestion. Do you have a use case in mind for 
having two separate names for the operation and the view?

Thanks,
John

On Wed, Sep 16, 2020, at 11:43, Dongjin Lee wrote:
> Hi John,
> 
> It seems like the available alternatives in this point is clear:
> 
> 1. Pass queriable name as a separate parameter (i.e.,
> `KTable#suppress(Suppressed, String)`)
> 2. Make use of the Suppression processor name as a queryable name by adding
> `enableQuery` optional flag to `Suppressed`.
> 
> However, I doubt the second approach a little bit; As far as I know, the
> processor name is introduced in KIP-307[^1] to make debugging topology easy
> and understandable. Since the processor name is an independent concept with
> the materialization, I feel the first approach is more natural and
> consistent. Is there any specific reason that you prefer the second
> approach?
> 
> Thanks,
> Dongjin
> 
> [^1]:
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL
> 
> 
> 
> On Wed, Sep 16, 2020 at 11:48 PM John Roesler  wrote:
> 
> > Hi Dongjin,
> >
> > Yes, that's where I was leaning. Although, I'd prefer adding
> > the option to Suppressed instead of adding a new argument to
> > the method call.
> >
> > What do you think about:
> >
> > class Suppressed {
> > +  public Suppressed enableQuery();
> > }
> >
> > Since Suppressed already has `withName(String)`, it seems
> > like all we need to add is a boolean flag.
> >
> > Does that seem sensible to you?
> >
> > Thanks,
> > -John
> >
> > On Wed, 2020-09-16 at 21:50 +0900, Dongjin Lee wrote:
> > > Hi John,
> > >
> > > > Although it's not great to have "special snowflakes" in the API,
> > Choice B
> > > does seem safer in the short term. We would basically be proposing a
> > > temporary API to make the suppressed view queriable without a
> > Materialized
> > > argument.
> > >
> > > Then, it seems like you prefer `KTable#suppress(Suppressed, String)`
> > (i.e.,
> > > queriable name only as a parameter) for this time, and refine API with
> > the
> > > other related KIPs later.
> > >
> > > Do I understand correctly?
> > >
> > > Thanks,
> > > Dongjin
> > >
> > > On Wed, Sep 16, 2020 at 2:17 AM John Roesler 
> > wrote:
> > >
> > > > Hi Dongjin,
> > > >
> > > > Thanks for presenting these options. The concern that
> > > > Matthias brought up is a very deep problem that afflicts all
> > > > operations downstream of windowing operations. It's the same
> > > > thing that derailed KIP-300. For the larger context, I have
> > > > developed a couple of approaches to resolve this situation,
> > > > but I think it makes sense to finish up KIP-478 before
> > > > presenting them.
> > > >
> > > > However, I don't think that we need in particular to block
> > > > the current proposal on solving that long-running and deep
> > > > issue with the DSL. Instead, we should make a top-level
> > > > decision whether to:
> > > >
> > > > A: Make Suppress just like all the other KTable operations.
> > > > It will have the same pathological behavior that the keyset
> > > > is unbounded while the store implementation is only a
> > > > KeyValueStore. Again, this exact pathology currently affects
> > > > all KTable operations that follow from windowing operations.
> > > > For example, it applies to the current workaround that
> > > > Dongjin documented in the KIP:
> > > > suppress().filter(Materialized). This is
> > > > Option 2 that Dongjin presented.
> > > >
> > > > B: Do something different with Suppress in order to side-
> > > > step the problem. For example, Suppress does not _need_ to
> > > > have a separate state store at all. If we just give people a
> > > > switch to make the operation queriable, we can implement a
> > > > ReadOnlyKeyValueStore interface by querying the "priorValue"
> > > > of the buffer first and then querying the upstream
> > > > ValueGetter. This broad category of "do something different
> > > > with Suppress" encompases Option 1 and Option 3 that Dongjin
> > > > presented.
> > > >
> > > >
> > > > Speaking personally, I think Choice A would be the most
> > > > obvious and least weird choice, but it presents a serious
> > > > risk of escalating the severity of the problem of unbounded
> > > > state. This is currently a risk that we're aware of, but has
> > > > not yet become a big problem in practice. As 

[jira] [Created] (KAFKA-10492) Core Raft implementation

2020-09-16 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10492:
---

 Summary: Core Raft implementation
 Key: KAFKA-10492
 URL: https://issues.apache.org/jira/browse/KAFKA-10492
 Project: Kafka
  Issue Type: Sub-task
Reporter: Jason Gustafson
Assignee: Jason Gustafson


This issue tracks the core implementation of the Raft protocol specified in 
KIP-595: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-595%3A+A+Raft+Protocol+for+the+Metadata+Quorum.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10476) ignore it

2020-09-16 Thread AbdulRahman Mahmoud (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10476?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

AbdulRahman Mahmoud resolved KAFKA-10476.
-
Resolution: Invalid

> ignore it
> -
>
> Key: KAFKA-10476
> URL: https://issues.apache.org/jira/browse/KAFKA-10476
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 2.2.1
>Reporter: AbdulRahman Mahmoud
>Priority: Major
>
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10491) Check authorizations before other criteria in KafkaApis

2020-09-16 Thread David Arthur (Jira)
David Arthur created KAFKA-10491:


 Summary: Check authorizations before other criteria in KafkaApis
 Key: KAFKA-10491
 URL: https://issues.apache.org/jira/browse/KAFKA-10491
 Project: Kafka
  Issue Type: Improvement
Reporter: David Arthur


In KafkaApis#handleAlterUserScramCredentialsRequest we check if the current 
broker is the controller before checking if the request is authorized. This is 
a potential information leak about details of the system (i.e., who is the 
controller). We should fix this to check the authz first.

[~hachikuji] pointed this out during the review for AlterIsr since I had 
followed the pattern in handleAlterUserScramCredentialsRequest. 

We should fix handleAlterUserScramCredentialsRequest and audit the rest of 
KafkaApis for similar patterns.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: Kafka » kafka-trunk-jdk8 #67

2020-09-16 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10487; Fetch response should return diverging epoch and end 
offset (#9290)


--
[...truncated 6.53 MB...]

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowPatternNotValidForTopicNameException[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldEnqueueLaterOutputsAfterEarlierOnes[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializersDeprecated[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateIfEvenTimeAdvances[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRouting[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowNoSuchElementExceptionForUnusedOutputTopicWithDynamicRouting[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldInitProcessor[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowForUnknownTopic[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldPunctuateOnStreamsTime[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureGlobalTopicNameIfWrittenInto[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldCaptureGlobalTopicNameIfWrittenInto[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfInMemoryBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldProcessFromSourcesThatMatchMultiplePattern[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldPopulateGlobalStore[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldThrowIfPersistentBuiltInStoreIsAccessedWithUntypedMethod[Eos enabled = 
false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldAllowPrePopulatingStatesStoresWithCachingEnabled[Eos enabled = false] 
PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldReturnCorrectPersistentStoreTypeOnly[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldRespectTaskIdling[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldUseSourceSpecificDeserializers[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > shouldReturnAllStores[Eos 
enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldNotCreateStateDirectoryForStatelessTopology[Eos enabled = false] PASSED

org.apache.kafka.streams.TopologyTestDriverTest > 
shouldApplyGlobalUpdatesCorrectlyInRecursiveTopologies[Eos enabled = false] 
STARTED

org.apache.kafka.streams.TopologyTestDriverTest > 

Re: [DISCUSS] KIP-405: Kafka Tiered Storage

2020-09-16 Thread Dhruvil Shah
Hi Satish, Harsha,

Thanks for the KIP. Few questions below:

1. Could you describe how retention would work with this KIP and which
threads are responsible for driving this work? I believe there are 3 kinds
of retention processes we are looking at:
  (a) Regular retention for data in tiered storage as per configured `
retention.ms` / `retention.bytes`.
  (b) Local retention for data in local storage as per configured `
local.log.retention.ms` / `local.log.retention.bytes`
  (c) Possibly regular retention for data in local storage, if the tiering
task is lagging or for data that is below the log start offset.

2. When does a segment become eligible to be tiered? Is it as soon as the
segment is rolled and the end offset is less than the last stable offset as
mentioned in the KIP? I wonder if we need to consider other parameters too,
like the highwatermark so that we are guaranteed that what we are tiering
has been committed to the log and accepted by the ISR.

3. The section on "Follower Fetch Scenarios" is useful but is a bit
difficult to parse at the moment. It would be useful to summarize the
changes we need in the ReplicaFetcher.

4. Related to the above, it's a bit unclear how we are planning on
restoring the producer state for a new replica. Could you expand on that?

5. Similarly, it would be worth summarizing the behavior on unclean leader
election. There are several scenarios to consider here: data loss from
local log, data loss from remote log, data loss from metadata topic, etc.
It's worth describing these in detail.

6. It would be useful to add details about how we plan on using RocksDB in
the default implementation of `RemoteLogMetadataManager`.

7. For a READ_COMMITTED FetchRequest, how do we retrieve and return the
aborted transaction metadata?

8. The `LogSegmentData` class assumes that we have a log segment, offset
index, time index, transaction index, producer snapshot and leader epoch
index. How do we deal with cases where we do not have one or more of these?
For example, we may not have a transaction index or producer snapshot for a
particular segment. The former is optional, and the latter is only kept for
up to the 3 latest segments.

Thanks,
Dhruvil

On Mon, Sep 7, 2020 at 6:54 PM Harsha Ch  wrote:

> Hi All,
>
> We are all working through the last meeting feedback. I'll cancel the
> tomorrow 's meeting and we can meanwhile continue our discussion in mailing
> list. We can start the regular meeting from next week onwards.
>
> Thanks,
>
> Harsha
>
> On Fri, Sep 04, 2020 at 8:41 AM, Satish Duggana < satish.dugg...@gmail.com
> > wrote:
>
> >
> >
> >
> > Hi Jun,
> > Thanks for your thorough review and comments. Please find the inline
> > replies below.
> >
> >
> >
> > 600. The topic deletion logic needs more details.
> > 600.1 The KIP mentions "The controller considers the topic partition is
> > deleted only when it determines that there are no log segments for that
> > topic partition by using RLMM". How is this done?
> >
> >
> >
> > It uses RLMM#listSegments() returns all the segments for the given topic
> > partition.
> >
> >
> >
> > 600.2 "If the delete option is enabled then the leader will stop RLM task
> > and stop processing and it sets all the remote log segment metadata of
> > that partition with a delete marker and publishes them to RLMM." We
> > discussed this earlier. When a topic is being deleted, there may not be a
> > leader for the deleted partition.
> >
> >
> >
> > This is a good point. As suggested in the meeting, we will add a separate
> > section for topic/partition deletion lifecycle and this scenario will be
> > addressed.
> >
> >
> >
> > 601. Unclean leader election
> > 601.1 Scenario 1: new empty follower
> > After step 1, the follower restores up to offset 3. So why does it have
> > LE-2 at offset 5?
> >
> >
> >
> > Nice catch. It was showing the leader epoch fetched from the remote
> > storage. It should be shown with the truncated till offset 3. Updated the
> > KIP.
> >
> >
> >
> > 601.2 senario 5: After Step 3, leader A has inconsistent data between its
> > local and the tiered data. For example. offset 3 has msg 3 LE-0 locally,
> > but msg 5 LE-1 in the remote store. While it's ok for the unclean leader
> > to lose data, it should still return consistent data, whether it's from
> > the local or the remote store.
> >
> >
> >
> > There is no inconsistency here as LE-0 offsets are [0, 4] and LE-2:
> > [5, ]. It will always get the right records for the given offset and
> > leader epoch. In case of remote, RSM is invoked to get the remote log
> > segment that contains the given offset with the leader epoch.
> >
> >
> >
> > 601.4 It seems that retention is based on
> > listRemoteLogSegments(TopicPartition topicPartition, long leaderEpoch).
> > When there is an unclean leader election, it's possible for the new
> leader
> > to not to include certain epochs in its epoch cache. How are remote
> > segments associated with those epochs being cleaned?
> >
> >
> >
> > 

Re: [DISCUSS] KIP-671: Shutdown Streams Application when appropriate exception is thrown

2020-09-16 Thread Walker Carlson
Hello Guozhang,

As for the logging I plan on having three logs. First, the client log that
it is requesting an application shutdown, second, the leader log processId
of the invoker, third, then the StreamRebalanceListener it logs that it is
closing because of an `stream.appShutdown`. Hopefully this will be enough
to make the cause of the close clear.

I see what you mean about the name being dependent on the behavior of the
method so I will try to clarify.  This is how I currently envision the call
working.

It is not an option to directly initiate a shutdown through a StreamThread
object from a KafkaStreams object because "KafkaConsumer is not safe for
multi-threaded access". So how it works is that the method in KafkaStreams
finds the first alive thread and sets a flag in the StreamThread. The
StreamThread will receive the flag in its runloop then set the error code
and trigger a rebalance, afterwards it will stop processing. After the
KafkaStreams has set the flag it will return true and continue running. If
there are no alive threads the shutdown will fail and return false.

What do you think the blocking behavior should be? I think that the
StreamThread should definitely stop to prevent any of the corruption we are
trying to avoid by shutting down, but I don't see any advantage of the
KafkaStreams call blocking.

You are correct to be concerned about the uncaught exception handler. If
there are no live StreamThreads the rebalance will not be started at all
and this would be a problem. However the user should be aware of this
because of the return of false and react appropriately. This would also be
fixed if we implemented our own handler so we can rebalance before the
StreamThread closes.

With that in mind I believe that `initiateClosingAllClients` would be an
appropriate name. WDYT?

Walker


On Wed, Sep 16, 2020 at 11:43 AM Guozhang Wang  wrote:

> Hello Walker,
>
> Thanks for the updated KIP. Previously I'm also a bit hesitant on the newly
> added public exception to communicate user-requested whole app shutdown,
> but the reason I did not bring this up is that I feel there's still a need
> from operational aspects that we can differentiate the scenario where an
> instance is closed because of a) local `streams.close()` triggered, or b) a
> remote instance's `stream.shutdownApp` triggered. So if we are going to
> remove that exception (which I'm also in favor), we should at least
> differentiate from the log4j levels.
>
> Regarding the semantics that "It should wait to receive the shutdown
> request in the rebalance it triggers." I'm not sure I fully understand,
> since this may be triggered from the stream thread's uncaught exception
> handler, if that thread is already dead then maybe a rebalance listener
> would not even be fired at all. Although I know this is some implementation
> details that you probably abstract away from the proposal, I'd like to make
> sure that we are on the same page regarding its blocking behavior since it
> is quite crucial to users as well. Could you elaborate a bit more?
>
> Regarding the function name, I guess my personal preference would depend on
> its actual blocking behavior as above :)
>
>
> Guozhang
>
>
>
>
> On Wed, Sep 16, 2020 at 10:52 AM Walker Carlson 
> wrote:
>
> > Hello all again,
> >
> > I have updated the kip to no longer use an exception and instead add a
> > method to the KafkaStreams class, this seems to satisfy everyone's
> concerns
> > about how and when the functionality will be invoked.
> >
> > There is still a question over the name. We must decide between
> > "shutdownApplication", "initateCloseAll", "closeAllInstaces" or some
> > variation.
> >
> > I am rather indifferent to the name. I think that they all get the point
> > across. The most clear to me would be shutdownApplicaiton or
> > closeAllInstacnes but WDYT?
> >
> > Walker
> >
> >
> >
> > On Wed, Sep 16, 2020 at 9:30 AM Walker Carlson 
> > wrote:
> >
> > > Hello Guozhang and Bruno,
> > >
> > > Thanks for the feedback.
> > >
> > > I will respond in two parts but I would like to clarify that I am not
> > tied
> > > down to any of these names, but since we are still deciding if we want
> to
> > > have an exception or not I would rather not get tripped up on choosing
> a
> > > name just yet.
> > >
> > > Guozhang:
> > > 1)  you are right about the INCOMPLETE_SOURCE_TOPIC_METADATA error. I
> am
> > > not planning on changing the behavior of handling source topic
> deletion.
> > > That is being down in kip-662 by Bruno. He is enabling the user to
> create
> > > their own handler and shutdownApplication is giving them the option to
> > > shutdown.
> > >
> > > 2) It seems that we will remove the Exception entirely so this won't
> > > matter (below)
> > >
> > > 3) It should wait to receive the shutdown request in the rebalance it
> > > triggers. That might be a better name. I am torn between using
> > > "application" or "all Instances" in a couple places. I think we should
> > pick
> > > 

Build failed in Jenkins: Kafka » kafka-trunk-jdk15 #67

2020-09-16 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10487; Fetch response should return diverging epoch and end 
offset (#9290)


--
[...truncated 3.29 MB...]

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithInMemoryStore PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithLogging STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithLogging PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithCaching STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldFailWithCaching PASSED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithPersistentStore STARTED

org.apache.kafka.streams.test.wordcount.WindowedWordCountProcessorTest > 
shouldWorkWithPersistentStore PASSED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testConsumerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testToString STARTED

org.apache.kafka.streams.test.TestRecordTest > testToString PASSED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords STARTED

org.apache.kafka.streams.test.TestRecordTest > testInvalidRecords PASSED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
STARTED

org.apache.kafka.streams.test.TestRecordTest > testPartialConstructorEquals 
PASSED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher STARTED

org.apache.kafka.streams.test.TestRecordTest > testMultiFieldMatcher PASSED

org.apache.kafka.streams.test.TestRecordTest > testFields STARTED

org.apache.kafka.streams.test.TestRecordTest > testFields PASSED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord STARTED

org.apache.kafka.streams.test.TestRecordTest > testProducerRecord PASSED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode STARTED

org.apache.kafka.streams.test.TestRecordTest > testEqualsAndHashCode PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnIsOpen 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnIsOpen 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldDeleteAndReturnPlainValue STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldDeleteAndReturnPlainValue PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnName 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldReturnName 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutAllWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutAllWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldReturnIsPersistent STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldReturnIsPersistent PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutIfAbsentWithUnknownTimestamp STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > 
shouldPutIfAbsentWithUnknownTimestamp PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardClose 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardClose 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardFlush 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardFlush 
PASSED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardInit 
STARTED

org.apache.kafka.streams.internals.KeyValueStoreFacadeTest > shouldForwardInit 
PASSED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis STARTED

org.apache.kafka.streams.MockTimeTest > shouldGetNanosAsMillis PASSED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime STARTED

org.apache.kafka.streams.MockTimeTest > shouldSetStartTime PASSED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldNotAllowNegativeSleep PASSED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep STARTED

org.apache.kafka.streams.MockTimeTest > shouldAdvanceTimeOnSleep PASSED

> Task :streams:upgrade-system-tests-0100:processTestResources NO-SOURCE
> Task :streams:upgrade-system-tests-0100:testClasses
> Task :streams:upgrade-system-tests-0100:checkstyleTest
> Task :streams:upgrade-system-tests-0100:spotbugsMain NO-SOURCE
> Task 

Re: [DISCUSS] KIP-671: Shutdown Streams Application when appropriate exception is thrown

2020-09-16 Thread Guozhang Wang
Hello Walker,

Thanks for the updated KIP. Previously I'm also a bit hesitant on the newly
added public exception to communicate user-requested whole app shutdown,
but the reason I did not bring this up is that I feel there's still a need
from operational aspects that we can differentiate the scenario where an
instance is closed because of a) local `streams.close()` triggered, or b) a
remote instance's `stream.shutdownApp` triggered. So if we are going to
remove that exception (which I'm also in favor), we should at least
differentiate from the log4j levels.

Regarding the semantics that "It should wait to receive the shutdown
request in the rebalance it triggers." I'm not sure I fully understand,
since this may be triggered from the stream thread's uncaught exception
handler, if that thread is already dead then maybe a rebalance listener
would not even be fired at all. Although I know this is some implementation
details that you probably abstract away from the proposal, I'd like to make
sure that we are on the same page regarding its blocking behavior since it
is quite crucial to users as well. Could you elaborate a bit more?

Regarding the function name, I guess my personal preference would depend on
its actual blocking behavior as above :)


Guozhang




On Wed, Sep 16, 2020 at 10:52 AM Walker Carlson 
wrote:

> Hello all again,
>
> I have updated the kip to no longer use an exception and instead add a
> method to the KafkaStreams class, this seems to satisfy everyone's concerns
> about how and when the functionality will be invoked.
>
> There is still a question over the name. We must decide between
> "shutdownApplication", "initateCloseAll", "closeAllInstaces" or some
> variation.
>
> I am rather indifferent to the name. I think that they all get the point
> across. The most clear to me would be shutdownApplicaiton or
> closeAllInstacnes but WDYT?
>
> Walker
>
>
>
> On Wed, Sep 16, 2020 at 9:30 AM Walker Carlson 
> wrote:
>
> > Hello Guozhang and Bruno,
> >
> > Thanks for the feedback.
> >
> > I will respond in two parts but I would like to clarify that I am not
> tied
> > down to any of these names, but since we are still deciding if we want to
> > have an exception or not I would rather not get tripped up on choosing a
> > name just yet.
> >
> > Guozhang:
> > 1)  you are right about the INCOMPLETE_SOURCE_TOPIC_METADATA error. I am
> > not planning on changing the behavior of handling source topic deletion.
> > That is being down in kip-662 by Bruno. He is enabling the user to create
> > their own handler and shutdownApplication is giving them the option to
> > shutdown.
> >
> > 2) It seems that we will remove the Exception entirely so this won't
> > matter (below)
> >
> > 3) It should wait to receive the shutdown request in the rebalance it
> > triggers. That might be a better name. I am torn between using
> > "application" or "all Instances" in a couple places. I think we should
> pick
> > one and be consistent but I am unsure which is more descriptive.
> >
> > Bruno:
> > I agree that in principle Exceptions should be used in exception cases.
> > And I have added a method in KafkaStreams to handle cases where an
> > Exception would not be appropriate. I guess you think that users should
> > never throw a Streams Exception then they could always throw and catch
> > their own exception and call shutdown Application from there. This would
> > allow them to exit a processor if they wanted to shutdown from there. I
> > will update the Kip to remove the exception.
> >
> > I would like to add that in the case of trying to shutdown from the
> > uncaught exception handler that we need at least one StreamThread to be
> > alive. So having our own handler instead of using the default one after
> the
> > thread has died would let us always close the application.
> >
> > Walker
> >
> > On Wed, Sep 16, 2020 at 5:02 AM Bruno Cadonna 
> wrote:
> >
> >> Hi Walker,
> >>
> >> Thank you for the KIP!
> >>
> >> I like the motivation of the KIP and the method to request a shutdown of
> >> all Kafka Streams clients of Kafka Streams application. I think we
> >> really need such functionality to react on errors. However, I am not
> >> convinced that throwing an exception to shutdown all clients is a good
> >> idea.
> >>
> >> An exception signals an exceptional situation to which we can react in
> >> multiple ways depending on the context. The exception that you propose
> >> seems rather a well defined user command than a exceptional situation to
> >> me. IMO, we should not use exceptions to control program flow because it
> >> mixes cause and effect. Hence, I would propose an invariant for public
> >> exceptions in Kafka Streams. The public exceptions in Kafka Streams
> >> should be caught by users, not thrown. But maybe I am missing the big
> >> advantage of using an exception here.
> >>
> >> I echo Guozhang's third point about clarifying the behavior of the
> >> method and the naming.
> >>
> >> Best,
> >> Bruno
> >>
> >> 

[jira] [Created] (KAFKA-10490) Make constructors public for Admin API value objects

2020-09-16 Thread Noa Resare (Jira)
Noa Resare created KAFKA-10490:
--

 Summary: Make constructors public for Admin API value objects
 Key: KAFKA-10490
 URL: https://issues.apache.org/jira/browse/KAFKA-10490
 Project: Kafka
  Issue Type: Improvement
  Components: clients
Affects Versions: 2.6.0
Reporter: Noa Resare


Developers writing automation that uses the {{Admin}} API will in many cases 
want to create a mock and configure that mock to return the value objects that 
is expected to be able to test other pieces of functionality in a controlled 
way.

However, since the constructors in the value objects that the various API 
endpoints return are either {{protected}} or the default access level, 
instantiating such value objects takes needs to use some convoluted trick to 
create instances (either mock them with a mocking framework, use reflection 
magic or create a helper method in the same package as they appear).

Please consider updating the constructor signatures and make them public and in 
doing so encourage good testing practices everywhere.

Here are some examples of classes affected by this:
 * CreateTopicsResult
 * DeleteTopicsResult
 * ListTopicsResult
 * DescribeTopicsResult
 * DescribeClusterResult
 * DescribeAclsResult
 * CreateAclsResul
 * DeleteAclsResult
 * DescribeConfigsResult
 * AlterConfigsResult
 * AlterReplicaLogDirsResult
 * DescribeLogDirsResult
 * DescribeReplicaLogDirsResult
 * CreatePartitionsResult
 * CreateDelegationTokenResult
 * RenewDelegationTokenResult
 * ExpireDelegationTokenResult
 * DescribeDelegationTokenResult
 * ...and so on



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-671: Shutdown Streams Application when appropriate exception is thrown

2020-09-16 Thread Walker Carlson
Hello all again,

I have updated the kip to no longer use an exception and instead add a
method to the KafkaStreams class, this seems to satisfy everyone's concerns
about how and when the functionality will be invoked.

There is still a question over the name. We must decide between
"shutdownApplication", "initateCloseAll", "closeAllInstaces" or some
variation.

I am rather indifferent to the name. I think that they all get the point
across. The most clear to me would be shutdownApplicaiton or
closeAllInstacnes but WDYT?

Walker



On Wed, Sep 16, 2020 at 9:30 AM Walker Carlson 
wrote:

> Hello Guozhang and Bruno,
>
> Thanks for the feedback.
>
> I will respond in two parts but I would like to clarify that I am not tied
> down to any of these names, but since we are still deciding if we want to
> have an exception or not I would rather not get tripped up on choosing a
> name just yet.
>
> Guozhang:
> 1)  you are right about the INCOMPLETE_SOURCE_TOPIC_METADATA error. I am
> not planning on changing the behavior of handling source topic deletion.
> That is being down in kip-662 by Bruno. He is enabling the user to create
> their own handler and shutdownApplication is giving them the option to
> shutdown.
>
> 2) It seems that we will remove the Exception entirely so this won't
> matter (below)
>
> 3) It should wait to receive the shutdown request in the rebalance it
> triggers. That might be a better name. I am torn between using
> "application" or "all Instances" in a couple places. I think we should pick
> one and be consistent but I am unsure which is more descriptive.
>
> Bruno:
> I agree that in principle Exceptions should be used in exception cases.
> And I have added a method in KafkaStreams to handle cases where an
> Exception would not be appropriate. I guess you think that users should
> never throw a Streams Exception then they could always throw and catch
> their own exception and call shutdown Application from there. This would
> allow them to exit a processor if they wanted to shutdown from there. I
> will update the Kip to remove the exception.
>
> I would like to add that in the case of trying to shutdown from the
> uncaught exception handler that we need at least one StreamThread to be
> alive. So having our own handler instead of using the default one after the
> thread has died would let us always close the application.
>
> Walker
>
> On Wed, Sep 16, 2020 at 5:02 AM Bruno Cadonna  wrote:
>
>> Hi Walker,
>>
>> Thank you for the KIP!
>>
>> I like the motivation of the KIP and the method to request a shutdown of
>> all Kafka Streams clients of Kafka Streams application. I think we
>> really need such functionality to react on errors. However, I am not
>> convinced that throwing an exception to shutdown all clients is a good
>> idea.
>>
>> An exception signals an exceptional situation to which we can react in
>> multiple ways depending on the context. The exception that you propose
>> seems rather a well defined user command than a exceptional situation to
>> me. IMO, we should not use exceptions to control program flow because it
>> mixes cause and effect. Hence, I would propose an invariant for public
>> exceptions in Kafka Streams. The public exceptions in Kafka Streams
>> should be caught by users, not thrown. But maybe I am missing the big
>> advantage of using an exception here.
>>
>> I echo Guozhang's third point about clarifying the behavior of the
>> method and the naming.
>>
>> Best,
>> Bruno
>>
>> On 16.09.20 06:28, Guozhang Wang wrote:
>> > Hello Walker,
>> >
>> > Thanks for proposing the KIP! I have a couple more comments:
>> >
>> > 1. ShutdownRequestedException: my understanding is that this exception
>> is
>> > only used if the application-shutdown was initiated by by the user
>> > triggered "shutdownApplication()", otherwise e.g. if it is due to source
>> > topic not found and Streams library decides to close the whole
>> application
>> > automatically, we would still throw the original exception
>> > a.k.a. MissingSourceTopicException to the uncaught exception handling.
>> Is
>> > that the case? Also for this exception, which package are you proposing
>> to
>> > add to?
>> >
>> > 2. ShutdownRequestedException: for its constructor, I'm wondering what
>> > Throwable "root cause" could it ever be? Since I'm guessing here that we
>> > would just use a single error code in the protocol still to tell other
>> > instances to shutdown, and that error code would not allow us to encode
>> any
>> > more information like root causes at all, it seems that parameter would
>> > always be null.
>> >
>> > 3. shutdownApplication: again I'd like to clarify, would this function
>> > block on the local instance to complete shutting down all its threads
>> like
>> > `close()` as well, or would it just to initiate the shutdown and not
>> wait
>> > for local threads at all? Also a nit suggestion regarding the name, if
>> it
>> > is only for initiating the shutdown, maybe naming as "initiateCloseAll"

Re: [DISCUSS] KIP-653: Upgrade log4j to log4j2

2020-09-16 Thread Dongjin Lee
Hi Ismael,

> Have we considered switching to the log4j2 logging config format by
default and providing a mechanism to use the old format?

As of present, the proposal leaves the default config format switching to
sometime in the future. However, I think it is not a difficult task and is
up to the community's decision. The draft implementation already includes
log4j2 counterparts for all existing 1.x format (template) configs.
Although it still uses traditional log4j format as a default for backward
compatibility, the users who prefer the log4j2 configs can use it by
setting `export
KAFKA_LOG4J_OPTS="-Dlog4j.configurationFile={log4j2-config-file-path}"`.
Whenever we change the default logging format, we must don't forget to
switch this functionality to the reverse, i.e., making log4j 1.x format
available as an opt-in.

I am so concerned about the community's opinion when would be adequate to
make the log4j2 config as default.

Thanks,
Dongjin

+1. As a note, I have an almost-completed implementation of log4j2
equivalent for the log4j-appender. I think it would be great if this
feature can be provided with changing the default logging config format.

On Wed, Sep 16, 2020 at 11:49 PM Ismael Juma  wrote:

> Thanks for the KIP, Dongjin. Have we considered switching to the log4j2
> logging config format by default and providing a mechanism to use the old
> format? It is likely that we will release 3.0 as the release after 2.7, so
> it would provide a good opportunity to move on from the legacy config
> format. The other option is to stick with the old format for 3.0 and
> migrate to the new format in 4.0.
>
> Ismael
>
> On Wed, Aug 5, 2020 at 7:45 AM Dongjin Lee  wrote:
>
> > Hi, Kafka dev,
> >
> > I hope to initiate the discussion of KIP-653, upgrading log4j to log4j2.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-653%3A+Upgrade+log4j+to+log4j2
> >
> > All kinds of feedbacks are greatly appreciated!
> >
> > Best,
> > Dongjin
> >
> > --
> > *Dongjin Lee*
> >
> > *A hitchhiker in the mathematical world.*
> >
> >
> >
> >
> > *github:  github.com/dongjinleekr
> > keybase:
> https://keybase.io/dongjinleekr
> > linkedin:
> kr.linkedin.com/in/dongjinleekr
> > speakerdeck:
> > speakerdeck.com/dongjin
> > *
> >
>


-- 
*Dongjin Lee*

*A hitchhiker in the mathematical world.*




*github:  github.com/dongjinleekr
keybase: https://keybase.io/dongjinleekr
linkedin: kr.linkedin.com/in/dongjinleekr
speakerdeck: speakerdeck.com/dongjin
*


Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-16 Thread Sophie Blee-Goldman
>
> We guarantee that the metadata of the dead stream threads  will be
> returned by KafkaStreams#localThreadsMetadata() at least until the next
> call to KafkaStreams#addStreamThread() or
> KafkaStreams#removeStreamThread() after the stream thread transited to
> DEAD


This seems kind of tricky...personally I would find it pretty odd if I
queried the
local thread metadata and found two threads, A (alive) and B (dead), and
then
called removeStreamThread() and now suddenly I have zero. Or if I call
addStreamThread and now I still have two threads.

Both of those results seem to indicate that only live threads "count" and
are returned
by localThreadsMetadata(). But in reality we do temporarily keep the dead
thread,
but only for the arbitrary amount of time until the next time you want to
add or
remove some other stream thread? That seems like a weird side effect of the
add/removeStreamThread APIs.

If we really think users might want to log the metadata of dead threads,
then
let's just do that for them or give them a way to do exactly that.

I'm not that concerned about the backwards compatibility of removing dead
threads from the localThreadsMetadata, because I find it hard to believe
that
users do anything other than just skip over them in the list (set?) that
gets
returned. But maybe someone can chime in with an example use case.

I'm actually even a little skeptical that any users might want to log the
metadata of a
dead thread, since all of the metadata is only useful for IQ on live
threads or
already covered by other easily discoverable logging elsewhere, or both.

On Wed, Sep 16, 2020 at 2:07 AM Bruno Cadonna  wrote:

> Hi again,
>
> I just realized that if we filter out DEAD stream threads in
> localThreadsMetadata(), users cannot log the metadata of dying stream
> threads in the uncaught exception handler.
>
> I realized this thanks to the example Guozhang requested in the KIP.
> Thank you for that, Guozhang!
>
> Hence, I adapted the KIP as follows:
>
> - We do not filter out DEAD stream threads in
> KafkaStreams#localThreadsMetadata()
>
> - We guarantee that the metadata of the dead stream threads  will be
> returned by KafkaStreams#localThreadsMetadata() at least until the next
> call to KafkaStreams#addStreamThread() or
> KafkaStreams#removeStreamThread() after the stream thread transited to
> DEAD. Besides giving users the opportunity to log the metadata of a
> dying stream thread in its uncaught exception handler, this guarantee
> makes KafkaStreams#localThreadsMetadata() completely backward compatible
> to the current behavior, because if KafkaStreams#addStreamThread() and
> KafkaStreams#removeStreamThread() are never called,
> KafkaStreams#localThreadsMetadata() will also return the metadata of all
> streams threads that have ever died which corresponds to the current
> behavior.
>
> - We guarantee that dead stream threads are removed from a Kafka Streams
> client at latest after the next call to KafkaStreams#addStreamThread()
> or KafkaStreams#removeStreamThread() following the transition of the
> stream thread to DEAD. This guarantees that the number of maintained
> stream threads does not grow indefinitely.
>
>
> Best,
> Bruno
>
>
>
> On 16.09.20 09:23, Bruno Cadonna wrote:
> > Hi Guozhang,
> >
> > Good point! I would propose to filter out DEAD stream threads in
> > localThreadsMetadata() to get consistent results that do not depend on
> > timing. I will update the KIP accordingly.
> >
> > Best,
> > Bruno
> >
> > On 16.09.20 06:02, Guozhang Wang wrote:
> >> Thanks Bruno, your replies make sense to me. As for
> >> localThreadsMetadata() itself,
> >> I'd like to clarify if it would return any still-bookkept threads, or
> >> would
> >> it specifically filter out those DEAD threads even if they are not yet
> >> removed.
> >>
> >> Otherwise, the KIP LGTM.
> >>
> >> Guozhang
> >>
> >> On Tue, Sep 15, 2020 at 2:58 AM Bruno Cadonna 
> wrote:
> >>
> >>> Hi Guozhang,
> >>>
> >>> Thank you for your feedback. I replied inline.
> >>>
> >>> Best,
> >>> Bruno
> >>>
> >>> On 09.09.20 23:43, Guozhang Wang wrote:
>  Hello Bruno,
> 
>  Finally got some time to review your KIP and the discussion thread
>  now..
> >>> a
>  few comments below:
> 
>  1) I'm with Matthias about the newly added numberOfAliveStreamThreads
> >>> v.s.
>  existing localThreadsMetadata: to me it seems we can always achieve
> the
>  first based on the second. It seems not worthy to provide some "syntax
>  sugar" to the API but just let users do the filtering themselves.
> >>>
> >>> I am not married to that method. I removed it.
> >>>
>  Furthermore, I'm wondering what's the rationale behind removing the
>  DEAD
>  threads from localThreadsMetadata()? Personally I feel returning all
>  threads, including those who are ever closed, either due to
>  exception or
>  due to removeStreamThread, would be beneficial for debugging
>  purposes, as
>  long as within the 

Re: [DISCUSS] KIP-508: Make Suppression State Queriable - rebooted.

2020-09-16 Thread Dongjin Lee
Hi John,

It seems like the available alternatives in this point is clear:

1. Pass queriable name as a separate parameter (i.e.,
`KTable#suppress(Suppressed, String)`)
2. Make use of the Suppression processor name as a queryable name by adding
`enableQuery` optional flag to `Suppressed`.

However, I doubt the second approach a little bit; As far as I know, the
processor name is introduced in KIP-307[^1] to make debugging topology easy
and understandable. Since the processor name is an independent concept with
the materialization, I feel the first approach is more natural and
consistent. Is there any specific reason that you prefer the second
approach?

Thanks,
Dongjin

[^1]:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-307%3A+Allow+to+define+custom+processor+names+with+KStreams+DSL



On Wed, Sep 16, 2020 at 11:48 PM John Roesler  wrote:

> Hi Dongjin,
>
> Yes, that's where I was leaning. Although, I'd prefer adding
> the option to Suppressed instead of adding a new argument to
> the method call.
>
> What do you think about:
>
> class Suppressed {
> +  public Suppressed enableQuery();
> }
>
> Since Suppressed already has `withName(String)`, it seems
> like all we need to add is a boolean flag.
>
> Does that seem sensible to you?
>
> Thanks,
> -John
>
> On Wed, 2020-09-16 at 21:50 +0900, Dongjin Lee wrote:
> > Hi John,
> >
> > > Although it's not great to have "special snowflakes" in the API,
> Choice B
> > does seem safer in the short term. We would basically be proposing a
> > temporary API to make the suppressed view queriable without a
> Materialized
> > argument.
> >
> > Then, it seems like you prefer `KTable#suppress(Suppressed, String)`
> (i.e.,
> > queriable name only as a parameter) for this time, and refine API with
> the
> > other related KIPs later.
> >
> > Do I understand correctly?
> >
> > Thanks,
> > Dongjin
> >
> > On Wed, Sep 16, 2020 at 2:17 AM John Roesler 
> wrote:
> >
> > > Hi Dongjin,
> > >
> > > Thanks for presenting these options. The concern that
> > > Matthias brought up is a very deep problem that afflicts all
> > > operations downstream of windowing operations. It's the same
> > > thing that derailed KIP-300. For the larger context, I have
> > > developed a couple of approaches to resolve this situation,
> > > but I think it makes sense to finish up KIP-478 before
> > > presenting them.
> > >
> > > However, I don't think that we need in particular to block
> > > the current proposal on solving that long-running and deep
> > > issue with the DSL. Instead, we should make a top-level
> > > decision whether to:
> > >
> > > A: Make Suppress just like all the other KTable operations.
> > > It will have the same pathological behavior that the keyset
> > > is unbounded while the store implementation is only a
> > > KeyValueStore. Again, this exact pathology currently affects
> > > all KTable operations that follow from windowing operations.
> > > For example, it applies to the current workaround that
> > > Dongjin documented in the KIP:
> > > suppress().filter(Materialized). This is
> > > Option 2 that Dongjin presented.
> > >
> > > B: Do something different with Suppress in order to side-
> > > step the problem. For example, Suppress does not _need_ to
> > > have a separate state store at all. If we just give people a
> > > switch to make the operation queriable, we can implement a
> > > ReadOnlyKeyValueStore interface by querying the "priorValue"
> > > of the buffer first and then querying the upstream
> > > ValueGetter. This broad category of "do something different
> > > with Suppress" encompases Option 1 and Option 3 that Dongjin
> > > presented.
> > >
> > >
> > > Speaking personally, I think Choice A would be the most
> > > obvious and least weird choice, but it presents a serious
> > > risk of escalating the severity of the problem of unbounded
> > > state. This is currently a risk that we're aware of, but has
> > > not yet become a big problem in practice. As Matthias
> > > pointed out, Suppress is far more likely to be used
> > > downstream of windowed tables than other operations, so
> > > having a Materialized overload has the significant
> > > risk of getting people into a bad state. Note, broadly
> > > advertising the workaround from the KIP would have the exact
> > > same impact, so we should be careful about recommending it.
> > >
> > > Although it's not great to have "special snowflakes" in the
> > > API, Choice B does seem safer in the short term. We would
> > > basically be proposing a temporary API to make the
> > > suppressed view queriable without a Materialized argument.
> > > Then, once we fix the main KIP-300 problem, we would look at
> > > converging Suppress with the rest of the KTable
> > > materialization APIs.
> > >
> > > WDYT?
> > > Thanks,
> > > -John
> > >
> > >
> > > On Wed, 2020-09-16 at 00:01 +0900, Dongjin Lee wrote:
> > > > Hi Matthias,
> > > >
> > > > Thank you very much for the detailed feedback. Here are my opinions:
> > > >
> > > > > 

Build failed in Jenkins: Kafka » kafka-trunk-jdk11 #68

2020-09-16 Thread Apache Jenkins Server
See 


Changes:

[github] KAFKA-10487; Fetch response should return diverging epoch and end 
offset (#9290)


--
Started by an SCM change
Running as SYSTEM
[EnvInject] - Loading node environment variables.
Building remotely on H22 (ubuntu) in workspace 

[WS-CLEANUP] Deleting project workspace...
[WS-CLEANUP] Deferred wipeout is used...
[WS-CLEANUP] Done
No credentials specified
Cloning the remote Git repository
Cloning repository https://github.com/apache/kafka.git
 > git init  
 > # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git --version # timeout=10
 > git fetch --tags --progress -- https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/* # timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
 > git config --add remote.origin.fetch +refs/heads/*:refs/remotes/origin/* # 
 > timeout=10
 > git config remote.origin.url https://github.com/apache/kafka.git # timeout=10
Fetching upstream changes from https://github.com/apache/kafka.git
 > git fetch --tags --progress -- https://github.com/apache/kafka.git 
 > +refs/heads/*:refs/remotes/origin/* # timeout=10
 > git rev-parse refs/remotes/origin/trunk^{commit} # timeout=10
 > git rev-parse refs/remotes/origin/origin/trunk^{commit} # timeout=10
Checking out Revision aa5263fba903c85812c0c31443f7d49ee371e9db 
(refs/remotes/origin/trunk)
 > git config core.sparsecheckout # timeout=10
 > git checkout -f aa5263fba903c85812c0c31443f7d49ee371e9db # timeout=10
Commit message: "KAFKA-10487; Fetch response should return diverging epoch and 
end offset (#9290)"
 > git rev-list --no-walk f28713f92218f41d21d5149cdc6034fa374821ca # timeout=10
[kafka-trunk-jdk11] $ /bin/sh -xe /tmp/jenkins7339771745374921959.sh
+ rm -rf 

[kafka-trunk-jdk11] $ /bin/sh -xe /tmp/jenkins2676906301549258.sh
+ ./gradlew --no-daemon --continue -PmaxParallelForks=2 
-PtestLoggingEvents=started,passed,skipped,failed -PxmlFindBugsReport=true 
clean test -PscalaVersion=2.12
To honour the JVM settings for this build a new JVM will be forked. Please 
consider using the daemon: 
https://docs.gradle.org/6.6/userguide/gradle_daemon.html.
Daemon will be stopped at the end of the build stopping after processing

FAILURE: Build completed with 2 failures.

1: Task failed with an exception.
---
* What went wrong:
Timeout waiting to lock jars (/home/jenkins/.gradle/caches/jars-8). It is 
currently in use by another Gradle instance.
Owner PID: 9684
Our PID: 3400
Owner Operation: 
Our operation: 
Lock file: /home/jenkins/.gradle/caches/jars-8/jars-8.lock

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug 
option to get more log output. Run with --scan to get full insights.
==

2: Task failed with an exception.
---
* What went wrong:
Timeout waiting to lock artifact cache 
(/home/jenkins/.gradle/caches/modules-2). It is currently in use by another 
Gradle instance.
Owner PID: 9684
Our PID: 3400
Owner Operation: 
Our operation: 
Lock file: /home/jenkins/.gradle/caches/modules-2/modules-2.lock

* Try:
Run with --stacktrace option to get the stack trace. Run with --info or --debug 
option to get more log output. Run with --scan to get full insights.
==

* Get more help at https://help.gradle.org

BUILD FAILED in 2m 6s
Build step 'Execute shell' marked build as failure
Recording test results
ERROR: Step ?Publish JUnit test result report? failed: No test report files 
were found. Configuration error?
Not sending mail to unregistered user git...@hugo-hirsch.de


Re: [DISCUSS] KIP-671: Shutdown Streams Application when appropriate exception is thrown

2020-09-16 Thread Walker Carlson
Hello Guozhang and Bruno,

Thanks for the feedback.

I will respond in two parts but I would like to clarify that I am not tied
down to any of these names, but since we are still deciding if we want to
have an exception or not I would rather not get tripped up on choosing a
name just yet.

Guozhang:
1)  you are right about the INCOMPLETE_SOURCE_TOPIC_METADATA error. I am
not planning on changing the behavior of handling source topic deletion.
That is being down in kip-662 by Bruno. He is enabling the user to create
their own handler and shutdownApplication is giving them the option to
shutdown.

2) It seems that we will remove the Exception entirely so this won't matter
(below)

3) It should wait to receive the shutdown request in the rebalance it
triggers. That might be a better name. I am torn between using
"application" or "all Instances" in a couple places. I think we should pick
one and be consistent but I am unsure which is more descriptive.

Bruno:
I agree that in principle Exceptions should be used in exception cases. And
I have added a method in KafkaStreams to handle cases where an
Exception would not be appropriate. I guess you think that users should
never throw a Streams Exception then they could always throw and catch
their own exception and call shutdown Application from there. This would
allow them to exit a processor if they wanted to shutdown from there. I
will update the Kip to remove the exception.

I would like to add that in the case of trying to shutdown from the
uncaught exception handler that we need at least one StreamThread to be
alive. So having our own handler instead of using the default one after the
thread has died would let us always close the application.

Walker

On Wed, Sep 16, 2020 at 5:02 AM Bruno Cadonna  wrote:

> Hi Walker,
>
> Thank you for the KIP!
>
> I like the motivation of the KIP and the method to request a shutdown of
> all Kafka Streams clients of Kafka Streams application. I think we
> really need such functionality to react on errors. However, I am not
> convinced that throwing an exception to shutdown all clients is a good
> idea.
>
> An exception signals an exceptional situation to which we can react in
> multiple ways depending on the context. The exception that you propose
> seems rather a well defined user command than a exceptional situation to
> me. IMO, we should not use exceptions to control program flow because it
> mixes cause and effect. Hence, I would propose an invariant for public
> exceptions in Kafka Streams. The public exceptions in Kafka Streams
> should be caught by users, not thrown. But maybe I am missing the big
> advantage of using an exception here.
>
> I echo Guozhang's third point about clarifying the behavior of the
> method and the naming.
>
> Best,
> Bruno
>
> On 16.09.20 06:28, Guozhang Wang wrote:
> > Hello Walker,
> >
> > Thanks for proposing the KIP! I have a couple more comments:
> >
> > 1. ShutdownRequestedException: my understanding is that this exception is
> > only used if the application-shutdown was initiated by by the user
> > triggered "shutdownApplication()", otherwise e.g. if it is due to source
> > topic not found and Streams library decides to close the whole
> application
> > automatically, we would still throw the original exception
> > a.k.a. MissingSourceTopicException to the uncaught exception handling. Is
> > that the case? Also for this exception, which package are you proposing
> to
> > add to?
> >
> > 2. ShutdownRequestedException: for its constructor, I'm wondering what
> > Throwable "root cause" could it ever be? Since I'm guessing here that we
> > would just use a single error code in the protocol still to tell other
> > instances to shutdown, and that error code would not allow us to encode
> any
> > more information like root causes at all, it seems that parameter would
> > always be null.
> >
> > 3. shutdownApplication: again I'd like to clarify, would this function
> > block on the local instance to complete shutting down all its threads
> like
> > `close()` as well, or would it just to initiate the shutdown and not wait
> > for local threads at all? Also a nit suggestion regarding the name, if it
> > is only for initiating the shutdown, maybe naming as "initiateCloseAll"
> > would be more specific?
> >
> >
> > Guozhang
> >
> >
> >
> >
> > On Mon, Sep 14, 2020 at 10:13 AM Walker Carlson 
> > wrote:
> >
> >> Hello Matthias and Sophie,
> >>
> >> You both make good points. I will respond to the separately below.
> >>
> >>
> >> Matthias:
> >> That is a fair point. KIP-662
> >> <
> >>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted
> >>> ,
> >> which
> >> is accepted, will make it so Source topic deletion will make it to the
> >> uncaught exception handler. Shutdown can be initiated from there.
> However
> >> this would mean that the stream thread is already dead. So I would have
> to
> >> rethink the exception for this 

[jira] [Resolved] (KAFKA-10487) Fix edge case in Raft truncation protocol

2020-09-16 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10487?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10487.
-
Resolution: Fixed

> Fix edge case in Raft truncation protocol
> -
>
> Key: KAFKA-10487
> URL: https://issues.apache.org/jira/browse/KAFKA-10487
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> Consider the following scenario:
> Three replicas: A, B, and C. In epoch=1, replica A is the leader and writes 
> up to offset 10. The leader then fails with the high watermark at offset 8. 
> Replica B had caught up to offset 10 while replica C was at offset 8. Suppose 
> that C is elected with epoch=2 and immediately writes records up to offset 
> 10. However, it also fails before these records become committed and replica 
> B gets elected and writes records
> up to offset 12. The epoch cache on each replica will look like the following:
> Replica A:
> (epoch=1, start_offset=0)
> Replica B:
> (epoch=1, start_offset=0)
> (epoch=3, start_offset=10)
> Replica C:
> (epoch=1, start_offset=0)
> (epoch=2, start_offset=8)
> Suppose C comes back online. It will attempt to fetch at offset=10 with 
> last_fetched_epoch=3. The leader B will detect log divergence and will return 
> truncation_offset=10. Replica C will truncate to offset 10 (a no-op) and 
> retry the same fetch and will be stuck.
> To fix this, I see two options:
> Option 1: In the case that the truncation offset equals the fetch offset, we 
> can instead return the previous epoch end offset. In this example, we would 
> return truncation_offset=0. The downside is that this causes unnecessary 
> truncation.
> Option 2: Rather than returning only the truncation offset, we can have the 
> leader return both the previous "diverging" epoch and its end offset. In this 
> example, B would return diverging_epoch=1, end_offset=10. Replica C would 
> then know
> to truncate to offset 8.
> The second option is what was initially specified in the Raft proposal, but 
> we changed during the discussion because we were not thinking of this case 
> and we thought the response could be simplified. My inclination is to restore 
> the originally specified truncation logic.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-567: Kafka Cluster Audit (new discussion)

2020-09-16 Thread Viktor Somogyi-Vass
One more after-thought on your second point (AbstractRequest): the reason I
introduced it in the first place was that this way implementers can access
request data. A use case can be if they want to audit a change in
configuration or client quotas but not just acknowledge the fact that such
an event happened but also capture the change itself by peeking into the
request. Sometimes it can be useful especially when people want to trace
back the order of events and what happened when and not just acknowledge
that there was an event of a certain kind. I also recognize that this might
be a very loose interpretation of auditing as it's not strictly related to
authorization but rather a way of tracing the admin actions within the
cluster. It even could be a different API therefore but because of the
variety of the Kafka APIs it's very hard to give a method that fits all, so
it's easier to pass down the AbstractRequest and the implementation can do
the extraction of valuable info. So that's why I added this in the first
place and I'm interested in your thoughts.

On Wed, Sep 16, 2020 at 4:41 PM Viktor Somogyi-Vass 
wrote:

> Hi Mickael,
>
> Thanks for reviewing the KIP.
>
> 1.) I just wanted to follow the conventions used with the Authorizer as it
> is built in a similar fashion, although it's true that in KafkaServer we
> call the configure() method and the start() in the next line. This would be
> the same in Auditor and even simpler as there aren't any parameters to
> start(), so I can remove it. If it turns out there is a need for it, we can
> add it later.
>
> 2.) Yes, this is a very good point, I will remove it, however in this case
> I don't think we need to add the ApiKey as it is already available in
> AuthorizableRequestContext.requestType(). One less parameter :).
>
> 3.) I'll add it. It will simply log important changes in the cluster like
> topic events (create, update, delete, partition or replication factor
> change), ACL events, config changes, reassignment, altering log dirs,
> offset delete, group delete with the authorization info like who initiated
> the call, was it authorized, were there any errors. Let me know if you
> think there are other APIs I should include.
>
> 4.) The builder is there mostly for easier usability but actually thinking
> of it it doesn't help much so I removed it. The AuditInfo is also a helper
> class so I don't see any value in transforming it into an interface but if
> I simplify it (by removing the builder) it will be cleaner. Would that work?
>
> I'll update the KIP to reflect my answers.
>
> Viktor
>
>
> On Mon, Sep 14, 2020 at 6:02 PM Mickael Maison 
> wrote:
>
>> Hi Viktor,
>>
>> Thanks for restarting the discussion on this KIP. Being able to easily
>> audit usage of a Kafka cluster is a very valuable feature.
>>
>> Regarding the API, I have a few of questions:
>> 1) You introduced a start() method. I don't think any other interfaces
>> have such a method. Users can do any setup they want in configure()
>>
>> 2) The first argument of audit is an AbstractRequest. Unfortunately
>> this type is not part of the public API. But actually I'm not sure
>> having the full request is really needed here. Maybe just passing the
>> Apikey would be enough as we already have all the resources from the
>> auditInfos field.
>>
>> 3) The KIP mentions a "LoggingAuditor" default implementation. What is
>> it doing? Can you add more details about it?
>>
>> 4) Can fields of AuditInfo be null? I can see there's a constructor
>> without an Errors and that sets the error field to None. However, with
>> the builder pattern, if error is not set it's null.
>>
>> 5) Should AuditInfo be an interface?
>>
>> On Mon, Sep 14, 2020 at 3:26 PM Viktor Somogyi-Vass
>>  wrote:
>> >
>> > Hi everyone,
>> >
>> > Changed the interface a little bit to accommodate methods better where
>> > authorization happens for multiple operations so the implementer of the
>> > audit interface will receive all authorizations together.
>> > I'll wait a few more days to allow people to react or give feedback but
>> if
>> > there are no objections until then, I'll start a vote.
>> >
>> > Viktor
>> >
>> > On Tue, Sep 8, 2020 at 9:49 AM Viktor Somogyi-Vass <
>> viktorsomo...@gmail.com>
>> > wrote:
>> >
>> > > Hi Everyone,
>> > >
>> > > I'd like to restart the discussion on this. Since the KIP has been
>> > > revamped I thought I'd start a new discussion thread.
>> > >
>> > > Link:
>> > >
>> https://cwiki.apache.org/confluence/display/KAFKA/KIP-567%3A+Kafka+Cluster+Audit
>> > >
>> > > Short summary:
>> > > - Would like to introduce a new interface similar to the Authorizer
>> called
>> > > Auditor as follows:
>> > > public interface Auditor {
>> > > audit(Request r, AuthorizableRequestContext c, AclOperation
>> > > o, Map isAllowed, Map> Errors>
>> > > errors);
>> > > }
>> > > - Basically it would pass down the request and the authorization
>> > > information to the auditor implementation where various kind 

Re: [DISCUSS] KIP-653: Upgrade log4j to log4j2

2020-09-16 Thread Ismael Juma
Thanks for the KIP, Dongjin. Have we considered switching to the log4j2
logging config format by default and providing a mechanism to use the old
format? It is likely that we will release 3.0 as the release after 2.7, so
it would provide a good opportunity to move on from the legacy config
format. The other option is to stick with the old format for 3.0 and
migrate to the new format in 4.0.

Ismael

On Wed, Aug 5, 2020 at 7:45 AM Dongjin Lee  wrote:

> Hi, Kafka dev,
>
> I hope to initiate the discussion of KIP-653, upgrading log4j to log4j2.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-653%3A+Upgrade+log4j+to+log4j2
>
> All kinds of feedbacks are greatly appreciated!
>
> Best,
> Dongjin
>
> --
> *Dongjin Lee*
>
> *A hitchhiker in the mathematical world.*
>
>
>
>
> *github:  github.com/dongjinleekr
> keybase: https://keybase.io/dongjinleekr
> linkedin: kr.linkedin.com/in/dongjinleekr
> speakerdeck:
> speakerdeck.com/dongjin
> *
>


Re: [DISCUSS] KIP-508: Make Suppression State Queriable - rebooted.

2020-09-16 Thread John Roesler
Hi Dongjin,

Yes, that's where I was leaning. Although, I'd prefer adding
the option to Suppressed instead of adding a new argument to
the method call.

What do you think about:

class Suppressed {
+  public Suppressed enableQuery();
}

Since Suppressed already has `withName(String)`, it seems
like all we need to add is a boolean flag.

Does that seem sensible to you?

Thanks,
-John

On Wed, 2020-09-16 at 21:50 +0900, Dongjin Lee wrote:
> Hi John,
> 
> > Although it's not great to have "special snowflakes" in the API, Choice B
> does seem safer in the short term. We would basically be proposing a
> temporary API to make the suppressed view queriable without a Materialized
> argument.
> 
> Then, it seems like you prefer `KTable#suppress(Suppressed, String)` (i.e.,
> queriable name only as a parameter) for this time, and refine API with the
> other related KIPs later.
> 
> Do I understand correctly?
> 
> Thanks,
> Dongjin
> 
> On Wed, Sep 16, 2020 at 2:17 AM John Roesler  wrote:
> 
> > Hi Dongjin,
> > 
> > Thanks for presenting these options. The concern that
> > Matthias brought up is a very deep problem that afflicts all
> > operations downstream of windowing operations. It's the same
> > thing that derailed KIP-300. For the larger context, I have
> > developed a couple of approaches to resolve this situation,
> > but I think it makes sense to finish up KIP-478 before
> > presenting them.
> > 
> > However, I don't think that we need in particular to block
> > the current proposal on solving that long-running and deep
> > issue with the DSL. Instead, we should make a top-level
> > decision whether to:
> > 
> > A: Make Suppress just like all the other KTable operations.
> > It will have the same pathological behavior that the keyset
> > is unbounded while the store implementation is only a
> > KeyValueStore. Again, this exact pathology currently affects
> > all KTable operations that follow from windowing operations.
> > For example, it applies to the current workaround that
> > Dongjin documented in the KIP:
> > suppress().filter(Materialized). This is
> > Option 2 that Dongjin presented.
> > 
> > B: Do something different with Suppress in order to side-
> > step the problem. For example, Suppress does not _need_ to
> > have a separate state store at all. If we just give people a
> > switch to make the operation queriable, we can implement a
> > ReadOnlyKeyValueStore interface by querying the "priorValue"
> > of the buffer first and then querying the upstream
> > ValueGetter. This broad category of "do something different
> > with Suppress" encompases Option 1 and Option 3 that Dongjin
> > presented.
> > 
> > 
> > Speaking personally, I think Choice A would be the most
> > obvious and least weird choice, but it presents a serious
> > risk of escalating the severity of the problem of unbounded
> > state. This is currently a risk that we're aware of, but has
> > not yet become a big problem in practice. As Matthias
> > pointed out, Suppress is far more likely to be used
> > downstream of windowed tables than other operations, so
> > having a Materialized overload has the significant
> > risk of getting people into a bad state. Note, broadly
> > advertising the workaround from the KIP would have the exact
> > same impact, so we should be careful about recommending it.
> > 
> > Although it's not great to have "special snowflakes" in the
> > API, Choice B does seem safer in the short term. We would
> > basically be proposing a temporary API to make the
> > suppressed view queriable without a Materialized argument.
> > Then, once we fix the main KIP-300 problem, we would look at
> > converging Suppress with the rest of the KTable
> > materialization APIs.
> > 
> > WDYT?
> > Thanks,
> > -John
> > 
> > 
> > On Wed, 2020-09-16 at 00:01 +0900, Dongjin Lee wrote:
> > > Hi Matthias,
> > > 
> > > Thank you very much for the detailed feedback. Here are my opinions:
> > > 
> > > > Because there is no final result for non-windowed KTables, it seems
> > that
> > > this new feature only make sense for the windowed-aggregation case?
> > > 
> > > I think a little bit different. Of course, for windowed KTable, this
> > > feature provides the final state; for non-windowed KTables, it provides a
> > > view to the records received more than the predefined waiting time ago -
> > > excluding the records that are waiting for more events.
> > > 
> > > > Thus, the signature of `Materialized` should take a `WindowStore`
> > instead
> > > of a `KeyValueStore`?
> > > 
> > > I reviewed the implementation following your comments and found the
> > > following:
> > > 
> > > 1. `Materialized` instance includes the following: KeySerde, ValueSerde,
> > > StoreSupplier, and Queriable Name.
> > > 2. The other `Materialized` method variants in `KTable` are making use of
> > > KeySerde, ValueSerde, and Queriable Name only. (That is, StoreSupplier is
> > > ignored.)
> > > 3. `KTable#suppress(Suppressed, Materialized)` uses the Queriable Name
> > > 

Re: [DISCUSS] KIP-567: Kafka Cluster Audit (new discussion)

2020-09-16 Thread Viktor Somogyi-Vass
Hi Mickael,

Thanks for reviewing the KIP.

1.) I just wanted to follow the conventions used with the Authorizer as it
is built in a similar fashion, although it's true that in KafkaServer we
call the configure() method and the start() in the next line. This would be
the same in Auditor and even simpler as there aren't any parameters to
start(), so I can remove it. If it turns out there is a need for it, we can
add it later.

2.) Yes, this is a very good point, I will remove it, however in this case
I don't think we need to add the ApiKey as it is already available in
AuthorizableRequestContext.requestType(). One less parameter :).

3.) I'll add it. It will simply log important changes in the cluster like
topic events (create, update, delete, partition or replication factor
change), ACL events, config changes, reassignment, altering log dirs,
offset delete, group delete with the authorization info like who initiated
the call, was it authorized, were there any errors. Let me know if you
think there are other APIs I should include.

4.) The builder is there mostly for easier usability but actually thinking
of it it doesn't help much so I removed it. The AuditInfo is also a helper
class so I don't see any value in transforming it into an interface but if
I simplify it (by removing the builder) it will be cleaner. Would that work?

I'll update the KIP to reflect my answers.

Viktor


On Mon, Sep 14, 2020 at 6:02 PM Mickael Maison 
wrote:

> Hi Viktor,
>
> Thanks for restarting the discussion on this KIP. Being able to easily
> audit usage of a Kafka cluster is a very valuable feature.
>
> Regarding the API, I have a few of questions:
> 1) You introduced a start() method. I don't think any other interfaces
> have such a method. Users can do any setup they want in configure()
>
> 2) The first argument of audit is an AbstractRequest. Unfortunately
> this type is not part of the public API. But actually I'm not sure
> having the full request is really needed here. Maybe just passing the
> Apikey would be enough as we already have all the resources from the
> auditInfos field.
>
> 3) The KIP mentions a "LoggingAuditor" default implementation. What is
> it doing? Can you add more details about it?
>
> 4) Can fields of AuditInfo be null? I can see there's a constructor
> without an Errors and that sets the error field to None. However, with
> the builder pattern, if error is not set it's null.
>
> 5) Should AuditInfo be an interface?
>
> On Mon, Sep 14, 2020 at 3:26 PM Viktor Somogyi-Vass
>  wrote:
> >
> > Hi everyone,
> >
> > Changed the interface a little bit to accommodate methods better where
> > authorization happens for multiple operations so the implementer of the
> > audit interface will receive all authorizations together.
> > I'll wait a few more days to allow people to react or give feedback but
> if
> > there are no objections until then, I'll start a vote.
> >
> > Viktor
> >
> > On Tue, Sep 8, 2020 at 9:49 AM Viktor Somogyi-Vass <
> viktorsomo...@gmail.com>
> > wrote:
> >
> > > Hi Everyone,
> > >
> > > I'd like to restart the discussion on this. Since the KIP has been
> > > revamped I thought I'd start a new discussion thread.
> > >
> > > Link:
> > >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-567%3A+Kafka+Cluster+Audit
> > >
> > > Short summary:
> > > - Would like to introduce a new interface similar to the Authorizer
> called
> > > Auditor as follows:
> > > public interface Auditor {
> > > audit(Request r, AuthorizableRequestContext c, AclOperation
> > > o, Map isAllowed, Map Errors>
> > > errors);
> > > }
> > > - Basically it would pass down the request and the authorization
> > > information to the auditor implementation where various kind of
> reporting
> > > can be done based on the request.
> > > - A new config would be added called "auditor" which is similar to the
> > > "authorizer" config, but users can pass a list of auditor class names.
> > > - The implementation is expected to be low latency similarly to the
> > > Authorizer.
> > > - A default implementation will be added that logs into a file.
> > >
> > > I appreciate any feedback on this.
> > >
> > > Best,
> > > Viktor
> > >
>


Re: [DISCUSS] KIP-631: The Quorum-based Kafka Controller

2020-09-16 Thread Unmesh Joshi
Thanks Colin, the changes look good to me. One small thing.
registration.lease.timeout.ms is the configuration on the controller side.
It will be good to comment on how brokers know about it, to be able to
send LeaseDurationMs
in the heartbeat request,
or else it can be added in the heartbeat response for brokers to know about
it.

Thanks,
Unmesh

On Fri, Sep 11, 2020 at 10:32 PM Colin McCabe  wrote:

> Hi Unmesh,
>
> I think you're right that we should use a duration here rather than a
> time.  As you said, the clock on the controller will probably not match the
> one on the broker.  I have updated the KIP.
>
> > > It's important to keep in mind that messages may be delayed in the
> > > network, or arrive out of order.  When this happens, we will use the
> start
> > > time specified in the request to determine if the request is stale.
> > I am assuming that there will be a single TCP connection maintained
> between
> > broker and active controller. So, there won't be any out of order
> requests?
> > There will be a scenario of broker GC pause, which might cause connection
> > timeout and broker might need to reestablish the connection. If the pause
> > is too long, lease will expire and the heartbeat sent after the pause
> will
> > be treated as a new registration (similar to restart case), and a new
> epoch
> > number will be assigned to the broker.
>
> I agree with the end of this paragraph, but not with the start :)
>
> There can be out-of-order requests, since the broker will simply use a new
> TCP connection if the old one has problems.  This can happen for a variety
> of reasons.  I don't think GC pauses are the most common reason for this to
> happen.  It's more common to see issues issues in the network itself that
> result connections getting dropped from time to time.
>
> So we have to assume that messages may arrive out of order, and possibly
> be delayed.  I added a note that heartbeat requests should be ignored if
> the metadata log offset they contain is smaller than a previous heartbeat.
>
> > When the active controller fails, the new active controller needs to be
> > sure that it considers all the known brokers as alive till the lease
> > expiration interval.  Because registration.lease.timeout.ms, is
> configured
> > on the controller, the new active controller will extend all the leases
> by
> > registration.lease.timeout.ms. I see that it won't need last heartbeat
> > time.
>
> Agreed.
>
> best,
> Colin
>
> >
> > Thanks,
> > Unmesh
> >
> > On Sat, Sep 5, 2020 at 1:28 AM Colin McCabe  wrote:
> >
> > > > Colin wrote:
> > > > > The reason for including LeaseStartTimeMs in the request is to
> ensure
> > > > > that the time required to communicate with the controller gets
> > > included in
> > > > > the lease time.  Since requests can potentially be delayed in the
> > > network
> > > > > for a long time, this is important.
> > >
> > > On Mon, Aug 31, 2020, at 05:58, Unmesh Joshi wrote:
> > > > The network time will be added anyway, because the lease timer on the
> > > > active controller will start only after the heartbeat request
> reaches the
> > > > server.
> > >
> > > Hi Unmesh,
> > >
> > > If the start time is not specified in the request, then the network
> time
> > > is excluded from the heartbeat time.
> > >
> > > Here's an example:
> > > Let's say broker A sends a heartbeat at time 100, and it arrives on the
> > > controller at time 200, and the lease duration is 1000.
> > >
> > > The controller looks at the start time in the request, which is 100,
> and
> > > adds 1000 to it, getting 1100.  On the other hand, if start time is not
> > > specified in the request, then the expiration will be at time 1200.
> > > That is what I mean by "the network time is included in the expiration
> > > time."
> > >
> > > > And I think, some assumption about network round trip time is
> > > > needed anyway to decide on the frequency of the heartbeat (
> > > > registration.heartbeat.interval.ms), and lease timeout (
> > > > registration.lease.timeout.ms). So I think just having a leaseTTL
> in the
> > > > request is easier to understand and implement.
> > >
> > > It's important to keep in mind that messages may be delayed in the
> > > network, or arrive out of order.  When this happens, we will use the
> start
> > > time specified in the request to determine if the request is stale.
> > >
> > > > > Yes, I agree that the lease timeout on the controller side should
> be
> > > > > reset in the case of controller failover.  The alternative would
> be to
> > > > > track the lease as hard state rather than soft state, but I think
> that
> > > > > is not really needed, and would result in more log entries.
> > > > My interpretation of the mention of BrokerRecord in the KIP was that
> this
> > > > record exists in the Raft log.
> > >
> > > BrokerRecord does exist in the Raft log, but does not include the last
> > > heartbeat time.
> > >
> > > > By soft state, do you mean the broker
> > > > records exist only 

Re: [DISCUSS] KIP-653: Upgrade log4j to log4j2

2020-09-16 Thread Dongjin Lee
Is there anyone who has suggestions or comments on this feature?

Thanks,
Dongjin

On Wed, Aug 5, 2020 at 11:37 PM Dongjin Lee  wrote:

> Hi, Kafka dev,
>
> I hope to initiate the discussion of KIP-653, upgrading log4j to log4j2.
>
>
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-653%3A+Upgrade+log4j+to+log4j2
>
> All kinds of feedbacks are greatly appreciated!
>
> Best,
> Dongjin
>
> --
> *Dongjin Lee*
>
> *A hitchhiker in the mathematical world.*
>
>
>
>
> *github:  github.com/dongjinleekr
> keybase: https://keybase.io/dongjinleekr
> linkedin: kr.linkedin.com/in/dongjinleekr
> speakerdeck: speakerdeck.com/dongjin
> *
>


-- 
*Dongjin Lee*

*A hitchhiker in the mathematical world.*




*github:  github.com/dongjinleekr
keybase: https://keybase.io/dongjinleekr
linkedin: kr.linkedin.com/in/dongjinleekr
speakerdeck: speakerdeck.com/dongjin
*


Re: [DISCUSS] KIP-508: Make Suppression State Queriable - rebooted.

2020-09-16 Thread Dongjin Lee
Hi John,

> Although it's not great to have "special snowflakes" in the API, Choice B
does seem safer in the short term. We would basically be proposing a
temporary API to make the suppressed view queriable without a Materialized
argument.

Then, it seems like you prefer `KTable#suppress(Suppressed, String)` (i.e.,
queriable name only as a parameter) for this time, and refine API with the
other related KIPs later.

Do I understand correctly?

Thanks,
Dongjin

On Wed, Sep 16, 2020 at 2:17 AM John Roesler  wrote:

> Hi Dongjin,
>
> Thanks for presenting these options. The concern that
> Matthias brought up is a very deep problem that afflicts all
> operations downstream of windowing operations. It's the same
> thing that derailed KIP-300. For the larger context, I have
> developed a couple of approaches to resolve this situation,
> but I think it makes sense to finish up KIP-478 before
> presenting them.
>
> However, I don't think that we need in particular to block
> the current proposal on solving that long-running and deep
> issue with the DSL. Instead, we should make a top-level
> decision whether to:
>
> A: Make Suppress just like all the other KTable operations.
> It will have the same pathological behavior that the keyset
> is unbounded while the store implementation is only a
> KeyValueStore. Again, this exact pathology currently affects
> all KTable operations that follow from windowing operations.
> For example, it applies to the current workaround that
> Dongjin documented in the KIP:
> suppress().filter(Materialized). This is
> Option 2 that Dongjin presented.
>
> B: Do something different with Suppress in order to side-
> step the problem. For example, Suppress does not _need_ to
> have a separate state store at all. If we just give people a
> switch to make the operation queriable, we can implement a
> ReadOnlyKeyValueStore interface by querying the "priorValue"
> of the buffer first and then querying the upstream
> ValueGetter. This broad category of "do something different
> with Suppress" encompases Option 1 and Option 3 that Dongjin
> presented.
>
>
> Speaking personally, I think Choice A would be the most
> obvious and least weird choice, but it presents a serious
> risk of escalating the severity of the problem of unbounded
> state. This is currently a risk that we're aware of, but has
> not yet become a big problem in practice. As Matthias
> pointed out, Suppress is far more likely to be used
> downstream of windowed tables than other operations, so
> having a Materialized overload has the significant
> risk of getting people into a bad state. Note, broadly
> advertising the workaround from the KIP would have the exact
> same impact, so we should be careful about recommending it.
>
> Although it's not great to have "special snowflakes" in the
> API, Choice B does seem safer in the short term. We would
> basically be proposing a temporary API to make the
> suppressed view queriable without a Materialized argument.
> Then, once we fix the main KIP-300 problem, we would look at
> converging Suppress with the rest of the KTable
> materialization APIs.
>
> WDYT?
> Thanks,
> -John
>
>
> On Wed, 2020-09-16 at 00:01 +0900, Dongjin Lee wrote:
> > Hi Matthias,
> >
> > Thank you very much for the detailed feedback. Here are my opinions:
> >
> > > Because there is no final result for non-windowed KTables, it seems
> that
> > this new feature only make sense for the windowed-aggregation case?
> >
> > I think a little bit different. Of course, for windowed KTable, this
> > feature provides the final state; for non-windowed KTables, it provides a
> > view to the records received more than the predefined waiting time ago -
> > excluding the records that are waiting for more events.
> >
> > > Thus, the signature of `Materialized` should take a `WindowStore`
> instead
> > of a `KeyValueStore`?
> >
> > I reviewed the implementation following your comments and found the
> > following:
> >
> > 1. `Materialized` instance includes the following: KeySerde, ValueSerde,
> > StoreSupplier, and Queriable Name.
> > 2. The other `Materialized` method variants in `KTable` are making use of
> > KeySerde, ValueSerde, and Queriable Name only. (That is, StoreSupplier is
> > ignored.)
> > 3. `KTable#suppress(Suppressed, Materialized)` uses the Queriable Name
> > only. StoreSupplier is also ignored.
> >
> > So, we have three choices for the method signature:
> >
> > 1. `KTable#suppress(Suppressed, String)` (i.e., passing the Queriable
> Name
> > only):
> >
> >   This is the simplest; however, it is inconsistent with the other
> > Materialized variant methods.
> >
> > 2. `KTable#suppress(Suppressed, Materialized)`
> (i.e.,
> > current proposal)
> >
> >   This approach is harmless at this point, for StoreSupplier is ignored;
> > However, since suppression can be used to both of `KeyValueStore` and
> > `WindowStore`, this approach is not only weird but also leaving some
> > potential risk to the future. (On second thoughts, I 

Re: [DISCUSS] KIP-671: Shutdown Streams Application when appropriate exception is thrown

2020-09-16 Thread Bruno Cadonna

Hi Walker,

Thank you for the KIP!

I like the motivation of the KIP and the method to request a shutdown of 
all Kafka Streams clients of Kafka Streams application. I think we 
really need such functionality to react on errors. However, I am not 
convinced that throwing an exception to shutdown all clients is a good idea.


An exception signals an exceptional situation to which we can react in 
multiple ways depending on the context. The exception that you propose 
seems rather a well defined user command than a exceptional situation to 
me. IMO, we should not use exceptions to control program flow because it 
mixes cause and effect. Hence, I would propose an invariant for public 
exceptions in Kafka Streams. The public exceptions in Kafka Streams 
should be caught by users, not thrown. But maybe I am missing the big 
advantage of using an exception here.


I echo Guozhang's third point about clarifying the behavior of the 
method and the naming.


Best,
Bruno

On 16.09.20 06:28, Guozhang Wang wrote:

Hello Walker,

Thanks for proposing the KIP! I have a couple more comments:

1. ShutdownRequestedException: my understanding is that this exception is
only used if the application-shutdown was initiated by by the user
triggered "shutdownApplication()", otherwise e.g. if it is due to source
topic not found and Streams library decides to close the whole application
automatically, we would still throw the original exception
a.k.a. MissingSourceTopicException to the uncaught exception handling. Is
that the case? Also for this exception, which package are you proposing to
add to?

2. ShutdownRequestedException: for its constructor, I'm wondering what
Throwable "root cause" could it ever be? Since I'm guessing here that we
would just use a single error code in the protocol still to tell other
instances to shutdown, and that error code would not allow us to encode any
more information like root causes at all, it seems that parameter would
always be null.

3. shutdownApplication: again I'd like to clarify, would this function
block on the local instance to complete shutting down all its threads like
`close()` as well, or would it just to initiate the shutdown and not wait
for local threads at all? Also a nit suggestion regarding the name, if it
is only for initiating the shutdown, maybe naming as "initiateCloseAll"
would be more specific?


Guozhang




On Mon, Sep 14, 2020 at 10:13 AM Walker Carlson 
wrote:


Hello Matthias and Sophie,

You both make good points. I will respond to the separately below.


Matthias:
That is a fair point. KIP-662
<
https://cwiki.apache.org/confluence/display/KAFKA/KIP-662%3A+Throw+Exception+when+Source+Topics+of+a+Streams+App+are+Deleted

,

which
is accepted, will make it so Source topic deletion will make it to the
uncaught exception handler. Shutdown can be initiated from there. However
this would mean that the stream thread is already dead. So I would have to
rethink the exception for this use case, perhaps it would be needed in the
KakfaStreams object. But this still leaves the case where there is only one
stream thread. I will think about it.

Maybe the source topics are a bad example as it makes this kip dependent on
Kip-662 getting implemented in a certain way. However this is not the only
reason this could be useful here
 is a jira ticket asking
for the same functionality. I have added a few other use cases to the kip.
Although I will still be rethinking where I want to add this functionality
and whether it should be an exception or not.

Sophie:
I agree that shutting down an instance could also be useful. There was some
discussion about this on KIP-663. It seems that we came to the conclusion
that close(Duration.ZERO) would be sufficient. link
<
https://mail-archives.apache.org/mod_mbox/kafka-dev/202008.mbox/%3c95f95168-2811-e57e-96e2-fb5e71d92...@confluent.io%3e



to
thread

Also I am not set on the name ShutdownRequested. If we decide to keep at as
an exception your idea is probably a better name.


Thanks for the feedback,
Walker


On Fri, Sep 11, 2020 at 11:08 AM Matthias J. Sax  wrote:


Thanks for the KIP.

It seem that the new exception would need to be thrown by user code?
However, in the motivation you mention the scenario of a missing source
topic that a user cannot detect, but KafkaStreams runtime would be
responsible to handle.

How do both things go together?


-Matthias

On 9/11/20 10:31 AM, Walker Carlson wrote:

Hello all,

I have created KIP-671 to give the option to shutdown a streams
application in response to an error.




https://cwiki.apache.org/confluence/display/KAFKA/KIP-671%3A+Shutdown+Streams+Application+when+appropriate+exception+is+thrown


This is because of the Jira ticket


Please give it a look and let me know if you have any feedback.

Thanks,
Walker











Re: [DISCUSSION] Upgrade system tests to python 3

2020-09-16 Thread Nikolay Izhikov
Hello, Guozhang.

> I can help run the test suite once your PR is cleanly rebased to verify the 
> whole suite works

Thank you for joining to the review.

1. PR rebased on the current trunk.

2. I triggered all tests in my private environment to verify them after rebase.
Will inform you once tests passed on my environment.

3. We need a new ducktape release [1] to be able to merge PR [2]. 
For now, PR based on the ducktape trunk branch [3], not some specific 
release.
If ducktape team need any help with the release, please, let me know.

[1] https://github.com/confluentinc/ducktape/issues/245
[2] https://github.com/apache/kafka/pull/9196
[3] 
https://github.com/apache/kafka/pull/9196/files#diff-9235a7bdb1ca9268681c0e56f3f3609bR39

> 16 сент. 2020 г., в 07:32, Guozhang Wang  написал(а):
> 
> Hello Nikolay,
> 
> I can help run the test suite once your PR is cleanly rebased to verify the
> whole suite works and then I can merge (I'm trusting Ivan and Magnus here
> for their reviews :)
> 
> Guozhang
> 
> On Mon, Sep 14, 2020 at 3:56 AM Nikolay Izhikov  wrote:
> 
>> Hello!
>> 
>> I got 2 approvals from Ivan Daschinskiy and Magnus Edenhill.
>> Committers, please, join the review.
>> 
>>> 3 сент. 2020 г., в 11:06, Nikolay Izhikov 
>> написал(а):
>>> 
>>> Hello!
>>> 
>>> Just a friendly reminder.
>>> 
>>> Patch to resolve some kind of technical debt - python2 in system tests
>> is ready!
>>> Can someone, please, take a look?
>>> 
>>> https://github.com/apache/kafka/pull/9196
>>> 
 28 авг. 2020 г., в 11:19, Nikolay Izhikov 
>> написал(а):
 
 Hello!
 
 Any feedback on this?
 What I should additionally do to prepare system tests migration?
 
> 24 авг. 2020 г., в 11:17, Nikolay Izhikov 
>> написал(а):
> 
> Hello.
> 
> PR [1] is ready.
> Please, review.
> 
> But, I need help with the two following questions:
> 
> 1. We need a new release of ducktape which includes fixes [2], [3] for
>> python3.
> I created the issue in ducktape repo [4].
> Can someone help me with the release?
> 
> 2. I know that some companies run system tests for the trunk on a
>> regular bases.
> Can someone show me some results of these runs?
> So, I can compare failures in my PR and in the trunk.
> 
> Results [5] of run all for my PR available in the ticket [6]
> 
> ```
> SESSION REPORT (ALL TESTS)
> ducktape version: 0.8.0
> session_id:   2020-08-23--002
> run time: 1010 minutes 46.483 seconds
> tests run:684
> passed:   505
> failed:   9
> ignored:  170
> ```
> 
> [1] https://github.com/apache/kafka/pull/9196
> [2]
>> https://github.com/confluentinc/ducktape/commit/23bd5ab53802e3a1e1da1ddf3630934f33b02305
> [3]
>> https://github.com/confluentinc/ducktape/commit/bfe53712f83b025832d29a43cde3de3d7803106f
> [4] https://github.com/confluentinc/ducktape/issues/245
> [5]
>> https://issues.apache.org/jira/secure/attachment/13010366/report.txt
> [6] https://issues.apache.org/jira/browse/KAFKA-10402
> 
>> 14 авг. 2020 г., в 21:26, Ismael Juma  написал(а):
>> 
>> +1
>> 
>> On Fri, Aug 14, 2020 at 7:42 AM John Roesler 
>> wrote:
>> 
>>> Thanks Nikolay,
>>> 
>>> No objection. This would be very nice to have.
>>> 
>>> Thanks,
>>> John
>>> 
>>> On Fri, Aug 14, 2020, at 09:18, Nikolay Izhikov wrote:
 Hello.
 
> If anyone's interested in porting it to Python 3 it would be a good
>>> change.
 
 I’ve created a ticket [1] to upgrade system tests to python3.
 Does someone have any additional inputs or objections for this
>> change?
 
 [1] https://issues.apache.org/jira/browse/KAFKA-10402
 
 
> 1 июля 2020 г., в 00:26, Gokul Ramanan Subramanian <
>>> gokul24...@gmail.com> написал(а):
> 
> Thanks Colin.
> 
> While at the subject of system tests, there are a few times I see
>> tests
> timed out (even on a large machine such as m5.4xlarge EC2 with
>> Linux).
>>> Are
> there any knobs that system tests provide to control timeouts /
>>> throughputs
> across all tests?
> Thanks.
> 
> On Tue, Jun 30, 2020 at 6:32 PM Colin McCabe 
>>> wrote:
> 
>> Ducktape runs on Python 2.  You can't use it with Python 3, as
>> you are
>> trying to do here.
>> 
>> If anyone's interested in porting it to Python 3 it would be a
>> good
>>> change.
>> 
>> Otherwise, using docker as suggested here seems to be the best
>> way to
>>> go.
>> 
>> best,
>> Colin
>> 
>> On Mon, Jun 29, 2020, at 02:14, Gokul Ramanan Subramanian wrote:
>>> Hi.
>>> 
>>> Has anyone had luck running Kafka system 

[jira] [Created] (KAFKA-10489) Committed consumer offsets not sent to consumer on rebalance

2020-09-16 Thread AD (Jira)
AD created KAFKA-10489:
--

 Summary: Committed consumer offsets not sent to consumer on 
rebalance
 Key: KAFKA-10489
 URL: https://issues.apache.org/jira/browse/KAFKA-10489
 Project: Kafka
  Issue Type: Bug
  Components: consumer, offset manager
Affects Versions: 2.1.0
Reporter: AD


Committed consumer offsets not sent to consumer on rebalance

Hi, I recently ran into an issue that my kafka cluster did not report committed 
consumer offsets to a client even though they are available in the 
__consumer_offset topic.
h2. Preparation
h3. Setup
 * Kafka 2.10 (2.12)
 * Three node cluster (one kafka & one zookeeper each)
 * Consumer Application (springboot 2.2.6 (w/ spring-kafka-2.3.7 & 
kafka-clients-2.3.1))
 * all timestamps are in UTC

h3. Scenario:
 * Topic: topic-0 (one partition, timebased retention: -1)
 * Group: group-0 (one group member only)

h2. Issue

After one node of the cluster was rebooted a rebalance lead to the consumer not 
getting the offsets stored in the {{__consumer_offsets}} topic but got the 
default (which in our case is {{latest}}).
 This lead to the consumer missing some data that got produced in the meantime.
h2. Expectation

I would have expected that the consumer receives the last committed offset, in 
this case {{334389}}.
h2. Logs
h3. Kafka
h4. Server Log
{noformat}
2020-09-10 10:12:11,349 INFO [KafkaServer id=0] Starting controlled shutdown 
(kafka.server.KafkaServer)
...
2020-09-10 10:12:12,299 INFO [ProducerStateManager partition=consumer-topic-0] 
Writing producer snapshot at offset 334408 (kafka.log.ProducerStateManager)
...
2020-09-10 10:12:12,601 INFO Shutdown complete. (kafka.log.LogManager)
...
2020-09-10 10:13:31,095 INFO starting (kafka.server.KafkaServer)
{noformat}
h4. Group coordinator
{noformat}
[consumer,topic,0]::OffsetAndMetadata(offset=334389, 
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1599732710804, 
expireTimestamp=None)  // timestamp:  2020-09-10 10:11:50.804
[consumer,topic,0]::OffsetAndMetadata(offset=334408, 
leaderEpoch=Optional.empty, metadata=, commitTimestamp=1599732737661, 
expireTimestamp=None)  // timestamp:  2020-09-10 10:12:17.661
{noformat}
h3. Consumer
h4. Log
{noformat}
Sep 10, 2020 @ 10:12:17.541 [Consumer clientId=consumer-topic-0, 
groupId=group-0] Successfully joined group with generation 1
Sep 10, 2020 @ 10:12:17.549 [Consumer clientId=consumer-topic-0, 
groupId=group-0] Setting newly assigned partitions: topic-0
Sep 10, 2020 @ 10:12:17.553 [Consumer clientId=consumer-topic-0, 
groupId=group-0] Found no committed offset for partition topic-0
Sep 10, 2020 @ 10:12:17.661 [Consumer clientId=consumer-topic-0, 
groupId=group-0] Resetting offset for partition topic-0 to offset 334408.
{noformat}
h4. Stacktrace
{noformat}
 Consumer exception
java.lang.IllegalStateException: This error handler cannot process 
'org.apache.kafka.clients.consumer.CommitFailedException's; no record 
information is available
at 
org.springframework.kafka.listener.SeekToCurrentErrorHandler.handle(SeekToCurrentErrorHandler.java:193)
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1173)
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:955)
at 
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit 
cannot be completed since the group has already rebalanced and assigned the 
partitions to another member. This means that the time between subsequent calls 
to poll() was longer than the configured max.poll.interval.ms, which typically 
implies that the poll loop is spending too much time message processing. You 
can address this either by increasing max.poll.interval.ms or by reducing the 
maximum size of batches returned in poll() with max.poll.records.
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.sendOffsetCommitRequest(ConsumerCoordinator.java:820)
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:692)
at 
org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1454)
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:2039)
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:1862)
at 
org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:983)
at 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-16 Thread Bruno Cadonna

Hi again,

I just realized that if we filter out DEAD stream threads in 
localThreadsMetadata(), users cannot log the metadata of dying stream 
threads in the uncaught exception handler.


I realized this thanks to the example Guozhang requested in the KIP. 
Thank you for that, Guozhang!


Hence, I adapted the KIP as follows:

- We do not filter out DEAD stream threads in 
KafkaStreams#localThreadsMetadata()


- We guarantee that the metadata of the dead stream threads  will be 
returned by KafkaStreams#localThreadsMetadata() at least until the next 
call to KafkaStreams#addStreamThread() or 
KafkaStreams#removeStreamThread() after the stream thread transited to 
DEAD. Besides giving users the opportunity to log the metadata of a 
dying stream thread in its uncaught exception handler, this guarantee 
makes KafkaStreams#localThreadsMetadata() completely backward compatible 
to the current behavior, because if KafkaStreams#addStreamThread() and 
KafkaStreams#removeStreamThread() are never called, 
KafkaStreams#localThreadsMetadata() will also return the metadata of all 
streams threads that have ever died which corresponds to the current 
behavior.


- We guarantee that dead stream threads are removed from a Kafka Streams 
client at latest after the next call to KafkaStreams#addStreamThread() 
or KafkaStreams#removeStreamThread() following the transition of the 
stream thread to DEAD. This guarantees that the number of maintained 
stream threads does not grow indefinitely.



Best,
Bruno



On 16.09.20 09:23, Bruno Cadonna wrote:

Hi Guozhang,

Good point! I would propose to filter out DEAD stream threads in 
localThreadsMetadata() to get consistent results that do not depend on 
timing. I will update the KIP accordingly.


Best,
Bruno

On 16.09.20 06:02, Guozhang Wang wrote:

Thanks Bruno, your replies make sense to me. As for
localThreadsMetadata() itself,
I'd like to clarify if it would return any still-bookkept threads, or 
would

it specifically filter out those DEAD threads even if they are not yet
removed.

Otherwise, the KIP LGTM.

Guozhang

On Tue, Sep 15, 2020 at 2:58 AM Bruno Cadonna  wrote:


Hi Guozhang,

Thank you for your feedback. I replied inline.

Best,
Bruno

On 09.09.20 23:43, Guozhang Wang wrote:

Hello Bruno,

Finally got some time to review your KIP and the discussion thread 
now..

a

few comments below:

1) I'm with Matthias about the newly added numberOfAliveStreamThreads

v.s.

existing localThreadsMetadata: to me it seems we can always achieve the
first based on the second. It seems not worthy to provide some "syntax
sugar" to the API but just let users do the filtering themselves.


I am not married to that method. I removed it.

Furthermore, I'm wondering what's the rationale behind removing the 
DEAD

threads from localThreadsMetadata()? Personally I feel returning all
threads, including those who are ever closed, either due to 
exception or
due to removeStreamThread, would be beneficial for debugging 
purposes, as

long as within the lifetime of an instance we expect the amount of such
dead threads will not increase linearly --- and if we agree with that,
maybe we can rename "removeStreamThread" to sth. like
"terminateStreamThread" indicating it is only terminated but not 
removed
--- and of course if users do not want to see those DEAD threads 
they can
always filter them out. I'm just proposing that we should still 
leave the

door open for those who want to check those ever terminated threads.



I actually think the number of dead stream threads might increase
linearly. Assume users have a systematic error that continuously kills a
stream thread and they blindly start a new stream thread in the uncaught
exception handler. This scenario might be a mistake but if the
systematic error does not occur at a high rate, it could also be a
strategy to keep the application running during the investigation of the
systematic error.

IMO, removing dead stream threads makes Kafka Streams more robust
because it prevent a possibly unbounded increase of memory usage. If
users want to debug the dead stream threads they can monitor the number
of dead threads with the metric proposed in the KIP and they could
additionally log the metadata of the dying stream thread in the uncaught
exception handler. I do not think that there is need to keep dead stream
threads around.


2) I think it would help to write down some example user code in

exception
handler e.g. to illustrate how this would be implemented -- e.g. we 
know

that practically the handler need to maintain a "this" reference of the
instance anyways in order to shutdown the whole instance or,

add/terminate

threads dynamically, but I want to see if we have listed all possible

call
paths like: a) a thread's handler logic to terminate another thread, 
b) a

thread handler to add new threads, etc are all appropriately supported
without deadlocks.



I added an example for an uncaught exception handler that adds a stream
thread to the 

Re: [DISCUSS] KIP-663: API to Start and Shut Down Stream Threads and to Request Closing of Kafka Streams Clients

2020-09-16 Thread Bruno Cadonna

Hi Guozhang,

Good point! I would propose to filter out DEAD stream threads in 
localThreadsMetadata() to get consistent results that do not depend on 
timing. I will update the KIP accordingly.


Best,
Bruno

On 16.09.20 06:02, Guozhang Wang wrote:

Thanks Bruno, your replies make sense to me. As for
localThreadsMetadata() itself,
I'd like to clarify if it would return any still-bookkept threads, or would
it specifically filter out those DEAD threads even if they are not yet
removed.

Otherwise, the KIP LGTM.

Guozhang

On Tue, Sep 15, 2020 at 2:58 AM Bruno Cadonna  wrote:


Hi Guozhang,

Thank you for your feedback. I replied inline.

Best,
Bruno

On 09.09.20 23:43, Guozhang Wang wrote:

Hello Bruno,

Finally got some time to review your KIP and the discussion thread now..

a

few comments below:

1) I'm with Matthias about the newly added numberOfAliveStreamThreads

v.s.

existing localThreadsMetadata: to me it seems we can always achieve the
first based on the second. It seems not worthy to provide some "syntax
sugar" to the API but just let users do the filtering themselves.


I am not married to that method. I removed it.


Furthermore, I'm wondering what's the rationale behind removing the DEAD
threads from localThreadsMetadata()? Personally I feel returning all
threads, including those who are ever closed, either due to exception or
due to removeStreamThread, would be beneficial for debugging purposes, as
long as within the lifetime of an instance we expect the amount of such
dead threads will not increase linearly --- and if we agree with that,
maybe we can rename "removeStreamThread" to sth. like
"terminateStreamThread" indicating it is only terminated but not removed
--- and of course if users do not want to see those DEAD threads they can
always filter them out. I'm just proposing that we should still leave the
door open for those who want to check those ever terminated threads.



I actually think the number of dead stream threads might increase
linearly. Assume users have a systematic error that continuously kills a
stream thread and they blindly start a new stream thread in the uncaught
exception handler. This scenario might be a mistake but if the
systematic error does not occur at a high rate, it could also be a
strategy to keep the application running during the investigation of the
systematic error.

IMO, removing dead stream threads makes Kafka Streams more robust
because it prevent a possibly unbounded increase of memory usage. If
users want to debug the dead stream threads they can monitor the number
of dead threads with the metric proposed in the KIP and they could
additionally log the metadata of the dying stream thread in the uncaught
exception handler. I do not think that there is need to keep dead stream
threads around.


2) I think it would help to write down some example user code in

exception

handler e.g. to illustrate how this would be implemented -- e.g. we know
that practically the handler need to maintain a "this" reference of the
instance anyways in order to shutdown the whole instance or,

add/terminate

threads dynamically, but I want to see if we have listed all possible

call

paths like: a) a thread's handler logic to terminate another thread, b) a
thread handler to add new threads, etc are all appropriately supported
without deadlocks.



I added an example for an uncaught exception handler that adds a stream
thread to the KIP. Removing a stream thread in an uncaught exception
handler doesn't seem a common use case to me. Nevertheless, we need to
make sure that we do not run in a deadlock in that case. I will consider
that during the implementation and write tests to check for deadlocks.

Shutting down the Kafka Streams client from inside an uncaught exception
handler is outside the scope of this KIP. In the beginning it was part
of the KIP, but during the discussion it turned out that we can fix our
existing close() method to accomplish the shutdown from inside an
uncaught exception handler. But I completely agree with you that we need
to ensure that we do not run into a deadlock in this case.




Guozhang


On Wed, Sep 9, 2020 at 11:35 AM Matthias J. Sax 

wrote:



I would prefer to not add a new method. It seems unnecessary.
`localThreadMetadata` does return all threads in all states(*) and thus
provides full insight.

(*) A thread in state DEAD could be returned as long as it's not removed
yet.

I don't see any advantage to pre-filter threads and to exclude threads
in state CREATE or PENDING_SHUTDOWN. Even if a CREATED thread is not
started yet, it is still "alive" in a broader sense. For example, if a
user wants to scale out to 10 thread, and 8 are RUNNING and 2 are in
state CREATED, a user won't need to add 2 more threads -- there are
already 10 threads.

For PENDING_SHUTDOWN and scale in it would be different I guess, as the
proposal would be to filter them out right away. However, filtering them
seems actually not to be "correct", as a thread in