[jira] [Created] (KAFKA-13184) KafkaProducerException

2021-08-09 Thread rajendra (Jira)
rajendra created KAFKA-13184:


 Summary: KafkaProducerException
 Key: KAFKA-13184
 URL: https://issues.apache.org/jira/browse/KAFKA-13184
 Project: Kafka
  Issue Type: Bug
  Components: config
Reporter: rajendra
 Attachments: image-2021-08-10-10-09-16-938.png

 we are recieving timeout error. please help me on this issue

!image-2021-08-10-10-09-16-938.png!

org.springframework.kafka.core.KafkaProducerException: Failed to send; nested 
exception is org.apache.kafka.common.errors.TimeoutException: Expiring 36 
record(s) for test-9:442705 ms has passed since batch creation
 at 
org.springframework.kafka.core.KafkaTemplate.lambda$buildCallback$4(KafkaTemplate.java:424)
 at 
org.springframework.kafka.core.KafkaTemplate$$Lambda$1362/0x.onCompletion(Unknown
 Source)
 at 
org.apache.kafka.clients.producer.KafkaProducer$InterceptorCallback.onCompletion(KafkaProducer.java:131



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.0 #89

2021-08-09 Thread Apache Jenkins Server
See 




Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #401

2021-08-09 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-764 Configurable backlog size for creating Acceptor

2021-08-09 Thread Haruki Okada
Hi, Mao.

> we should name the property socket.listen.backlog.size for better clarity

Sounds good to me. I updated the KIP to name the property as
`socket.listen.backlog.size` .

2021年8月9日(月) 23:30 David Mao :

> Hi Haruki,
>
> I think it makes sense to have this as a configurable property. I think we
> should name the property socket.listen.backlog.size for better clarity on
> what the property configures. Besides that, the proposal looks good to me.
>
> David
>
> On Wed, Jul 28, 2021 at 8:09 AM Haruki Okada  wrote:
>
> > Hi, Kafka.
> >
> > Does anyone have any thoughts or suggestions about the KIP?
> > If there seems to be no, I would like to start a VOTE later.
> >
> >
> > Thanks,
> >
> >
> > 2021年7月22日(木) 16:17 Haruki Okada :
> >
> > > Hi, Kafka.
> > >
> > > I proposed KIP-764, which tries to add new KafkaConfig to adjust
> > > Acceptor's backlog size.
> > > As described in the KIP and the ticket KAFKA-9648, currently backlog
> size
> > > is fixed value (50) and it may not be enough to handle incoming
> > connections
> > > from massive clients.
> > >
> > > So we would like to make it configurable.
> > >
> > >
> > >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-764%3A+Configurable+backlog+size+for+creating+Acceptor
> > >
> > > --
> > > 
> > > Okada Haruki
> > > ocadar...@gmail.com
> > > 
> > >
> >
> >
> > --
> > 
> > Okada Haruki
> > ocadar...@gmail.com
> > 
> >
>


-- 

Okada Haruki
ocadar...@gmail.com



Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #400

2021-08-09 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 489671 lines...]
[2021-08-09T23:43:09.188Z] 
[2021-08-09T23:43:09.188Z] ApiVersionsRequestTest > 
testApiVersionsRequestWithUnsupportedVersion() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestWithUnsupportedVersion()[1]
 PASSED
[2021-08-09T23:43:09.188Z] 
[2021-08-09T23:43:09.188Z] ApiVersionsRequestTest > 
testApiVersionsRequestValidationV0() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestValidationV0()[1] 
STARTED
[2021-08-09T23:43:12.440Z] 
[2021-08-09T23:43:12.440Z] ApiVersionsRequestTest > 
testApiVersionsRequestValidationV0() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestValidationV0()[1] 
PASSED
[2021-08-09T23:43:12.440Z] 
[2021-08-09T23:43:12.440Z] ApiVersionsRequestTest > 
testApiVersionsRequestValidationV3() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestValidationV3()[1] 
STARTED
[2021-08-09T23:43:15.521Z] 
[2021-08-09T23:43:15.521Z] ApiVersionsRequestTest > 
testApiVersionsRequestValidationV3() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestValidationV3()[1] 
PASSED
[2021-08-09T23:43:15.521Z] 
[2021-08-09T23:43:15.521Z] ApiVersionsRequestTest > 
testApiVersionsRequestThroughControlPlaneListener() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestThroughControlPlaneListener()[1]
 STARTED
[2021-08-09T23:43:18.719Z] 
[2021-08-09T23:43:18.719Z] ApiVersionsRequestTest > 
testApiVersionsRequestThroughControlPlaneListener() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestThroughControlPlaneListener()[1]
 PASSED
[2021-08-09T23:43:18.719Z] 
[2021-08-09T23:43:18.719Z] ApiVersionsRequestTest > testApiVersionsRequest() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequest()[1] STARTED
[2021-08-09T23:43:21.079Z] 
[2021-08-09T23:43:21.080Z] ApiVersionsRequestTest > testApiVersionsRequest() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequest()[1] PASSED
[2021-08-09T23:43:21.080Z] 
[2021-08-09T23:43:21.080Z] ApiVersionsRequestTest > 
testApiVersionsRequestValidationV0ThroughControlPlaneListener() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestValidationV0ThroughControlPlaneListener()[1]
 STARTED
[2021-08-09T23:43:25.510Z] 
[2021-08-09T23:43:25.510Z] ApiVersionsRequestTest > 
testApiVersionsRequestValidationV0ThroughControlPlaneListener() > 
kafka.server.ApiVersionsRequestTest.testApiVersionsRequestValidationV0ThroughControlPlaneListener()[1]
 PASSED
[2021-08-09T23:43:25.510Z] 
[2021-08-09T23:43:25.510Z] LogDirFailureTest > testIOExceptionDuringLogRoll() 
STARTED
[2021-08-09T23:43:36.910Z] 
[2021-08-09T23:43:36.910Z] LogDirFailureTest > testIOExceptionDuringLogRoll() 
PASSED
[2021-08-09T23:43:36.910Z] 
[2021-08-09T23:43:36.910Z] LogDirFailureTest > 
testIOExceptionDuringCheckpoint() STARTED
[2021-08-09T23:43:42.574Z] 
[2021-08-09T23:43:42.574Z] LogDirFailureTest > 
testIOExceptionDuringCheckpoint() PASSED
[2021-08-09T23:43:42.574Z] 
[2021-08-09T23:43:42.574Z] LogDirFailureTest > 
testProduceErrorFromFailureOnCheckpoint() STARTED
[2021-08-09T23:43:48.498Z] 
[2021-08-09T23:43:48.498Z] LogDirFailureTest > 
testProduceErrorFromFailureOnCheckpoint() PASSED
[2021-08-09T23:43:48.498Z] 
[2021-08-09T23:43:48.498Z] LogDirFailureTest > 
brokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure() STARTED
[2021-08-09T23:43:55.567Z] 
[2021-08-09T23:43:55.567Z] LogDirFailureTest > 
brokerWithOldInterBrokerProtocolShouldHaltOnLogDirFailure() PASSED
[2021-08-09T23:43:55.567Z] 
[2021-08-09T23:43:55.567Z] LogDirFailureTest > 
testReplicaFetcherThreadAfterLogDirFailureOnFollower() STARTED
[2021-08-09T23:44:01.855Z] 
[2021-08-09T23:44:01.855Z] LogDirFailureTest > 
testReplicaFetcherThreadAfterLogDirFailureOnFollower() PASSED
[2021-08-09T23:44:01.855Z] 
[2021-08-09T23:44:01.855Z] LogDirFailureTest > 
testProduceErrorFromFailureOnLogRoll() STARTED
[2021-08-09T23:44:07.477Z] 
[2021-08-09T23:44:07.477Z] LogDirFailureTest > 
testProduceErrorFromFailureOnLogRoll() PASSED
[2021-08-09T23:44:07.477Z] 
[2021-08-09T23:44:07.477Z] LogOffsetTest > 
testFetchOffsetsBeforeWithChangingSegmentSize() STARTED
[2021-08-09T23:44:10.733Z] 
[2021-08-09T23:44:10.733Z] LogOffsetTest > 
testFetchOffsetsBeforeWithChangingSegmentSize() PASSED
[2021-08-09T23:44:10.733Z] 
[2021-08-09T23:44:10.733Z] LogOffsetTest > testGetOffsetsBeforeEarliestTime() 
STARTED
[2021-08-09T23:44:15.200Z] 
[2021-08-09T23:44:15.200Z] LogOffsetTest > testGetOffsetsBeforeEarliestTime() 
PASSED
[2021-08-09T23:44:15.200Z] 
[2021-08-09T23:44:15.200Z] LogOffsetTest > 
testFetchOffsetByTimestampForMaxTimestampAfterTruncate() STARTED
[2021-08-09T23:44:22.198Z] 
[2021-08-09T23:44:22.198Z] LogOffsetTest > 
testFetchOffsetByTimestampForMaxTimestampAfterTruncate() PASSED
[2021-08-09T23:44:22.198Z] 
[2021-08-09T23:44:22.198Z] LogOffsetTest > 
testFetchOffsetByTimestampForMaxTimestampWithUnorderedTimestamps() STARTED

Re: [DISCUSS] KIP-655: Windowed "Distinct" Operation for KStream

2021-08-09 Thread Ivan Ponomarev

> To recap: Could it be that the idea to apply a DISTINCT-aggregation is
> for a different use-case than to remove duplicate messages from a 
KStream?


OK, imagine the following:

We have 10 thermometers. Each second they transmit the current 
measured temperature. The id of the thermometer is the key, the 
temperature is the value. The temperature measured by a single 
thermometer changes slowly: on average, say once per 30 seconds, so most 
of the messages from a given thermometer are duplicates. But we need to 
react to the change ASAP. And we need a materialized view: a relational 
database table with the columns 'thermometerID' and 'temperature'.


1) We don't want to send 100K updates per second to the database.

2) But it's still ok if some updates will be duplicates -- the updates 
are idempotent.


3) We even can afford to loose a fraction of data -- the data from each 
thermometer will be eventually 'fixed' by its fresh readings.


This is my use case (actually these are not thermometers and 
temperature, but it doesn't matter :-)


Is it conceptually different from what you were thinking about?

Regards,

Ivan

09.08.2021 3:05, Matthias J. Sax пишет:

Thanks for sharing your thoughts. I guess my first question about why
using the key boils down to the use case, and maybe you have something
else in mind than I do.


I see it this way: we define 'distinct' operation as returning a single
record per time window per selected key,


I believe this sentence explains your way of thinking about it. My way
of thinking about it is different though: KStream de-duplication means
to "filter/drop" duplicate records, and a record is by definition a
duplicate if key AND value are the same. --- Or are the use case, for
which there might be an "message ID" and even if the "message ID" is the
same, the content of the message might be different? If this holds, do
we really de-duplicate records (sounds more like, pick a random one)?


Just re-read some of my older replies, and I guess, back than I did just
comment about your KeyExtractor idea, without considering the
end-the-end picture. Thus, my reply below goes into a different
direction now:

We only need to apply a window because we need to purge state
eventually. To this end, I actually believe that applying a "sliding
window" is the best way to go: for each message we encounter, we start
the de-duplication window when the message arrives, don't emit any
duplicates as long as the window is open, and purge the state afterwards.

Of course, internally, the implementation must be quite different
compared to a regular aggregation: we need to pull the value into the
key, to create a unique window for each message, but this seems to be an
implementation detail. Thus, I am wondering if we should not try to put
a square pig into a round whole: the current windowing/aggregation API
is not designed for a KStream de-duplication use-case, because a
de-duplication is no aggregation to begin with. Why not use a different API:

KStream KStream#deduplicate(final Duration windowSize);

Including some more overloads to allow configuring the internal state
store (this state store should not be queryable similar to
KStream-KStream state stores...).


To recap: Could it be that the idea to apply a DISTINCT-aggregation is
for a different use-case than to remove duplicate messages from a KStream?



-Matthias


On 8/6/21 4:12 AM, Ivan Ponomarev wrote:

    - Why restrict de-duplication for the key only? Should we not also
consider the value (or make it somehow flexible and let the user choose?)


Wasn't it the first idea that we abandoned (I mean, to provide
'KeyExtractor' and so on)?

In order to keep things simple we decided to make

.selectKey(...) //here we select anything we need
   //add markAsPartitioned from KIP-759 to taste
   .groupByKey()
   .windowed(...)
   .distinct()
 //the only new operation that we add to the API, reusing
 //all the windowed aggregations' infrastructure


    - I am wondering if the return type should be `KStream` instead of a
`KTable`? If I deduplicate a stream, I might expect a stream back? I
don't really consider a stream de-duplication an aggregation with
"mutable state"...


First, because it's going to be an operation on a
Time/SessionWindowedKStream, and these operations usually return
KTable, ...>. Then, it might be useful to know to which
time window a deduplicated record actually belongs. And it is trivial
task to turn this table back to a stream.


IMHO, an unordered stream and it's ordered "cousin" should
yield the same result? -- Given your example it seems you want to keep
the first record base on offset order. Wondering why?


I see it this way: we define 'distinct' operation as returning a single
record per time window per selected key, no matter what record. So it's
ok if it yields different results for different orderings if its main
property holds!

And since we can select any key we like, we can get any degree of

Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.0 #88

2021-08-09 Thread Apache Jenkins Server
See 




Re: [DISCUSS] KIP-759: Unneeded repartition canceling

2021-08-09 Thread Ivan Ponomarev

Hi Matthias and Sophie!

==1.  markAsPartitioned vs extending `selectKey(..)` etc. with a config.==

I don't have a strong opinion here, both Sophie's and Matthias' points 
look convincing for me.


I think we should estimate the following: what is the probability that 
we will ever need to extend `selectKey` etc. with a config for the 
purposes other than `markAsPartitioned`?


If we find this probability high, then it's just a refactoring to 
deprecate overloads with `Named` and introduce overloads with dedicated 
configs, and we should do it this way.


If it's low or zero, maybe it's better not to mess with the existing 
APIs and to introduce a single `markAsPartitioned()` method, which 
itself can be easily deprecated if we find a better solution later!



==2. The IQ problem==

> it then has to be the case that

> Partitioner.partition(key) == Partitioner.partition(map(key))


Sophie, you got this wrong, and Matthias already explained why.

The actual required property for the mapping function is:

\forall k1, k2 (map(k1) = map(k2) => partition(k1) = partition(k2))

or, by contraposition law,

\forall k1, k2 (partition(k1) =/= partition(k2) => map(k1) =/= map(k2) )


(look at the whiteboard photo that I attached to the KIP).

There is a big class of such mappings: key -> Tuple(key, anyValue). This 
is actually what we often do before aggregation, and this mapping does 
not require repartition.


But of course we can extract the original key from Tuple(key, anyValue), 
and this can save IQ and joins!


This is what I'm talking about when I talk about 'CompositeKey' idea.

We can do the following:

1. implement a 'partitioner wrapper' that recognizes tuples 
(CompositeKeys) and uses only the 'head' to calculate the partition,


2. implement

selectCompositeKey(BiFunction tailSelector) {
  selectKey((k, v) -> new CompositeKey(k, tailSelector.apply(k, v));
  //MARK_AS_PARTITIONED call here,
  //but this call is an implementation detail and we do not expose
  //markAsPartitioned publicly! 
}

WDYT? (it's just a brainstorming idea)

09.08.2021 2:38, Matthias J. Sax пишет:

Hi,

I originally had a similar thought about `markAsPartitioned()` vs
extending `selectKey()` et al. with a config. While I agree that it
might be conceptually cleaner to use a config object, I did not propose
it as the API impact (deprecating stuff and adding new stuff) is quite
big... If we think it's an acceptable price to pay, I am ok with it though.

I also do think, that `markAsPartitioned()` could actually be
categorized as an operator... We don't expose it in the API as
first-class citizen atm, but in fact we have two types of `KStream` -- a
"PartitionedKStream" and a "NonPartitionedKStream". Thus,
`markAsPartitioned()` can be seen as a "cast operator" that converts the
one into the other.

I also think that the raised concern about "forgetting to remove
`markAsPartitioned()`" might not be very strong though. If you have
different places in the code that link stuff together, a call to eg.
`selectKey().markAsPartitioned()` must always to together. If you have
some other place in the code that get a `KStream` passed an input, it
would be "invalid" to blindly call `markAsPartitioned()` as you don't
know anything about the upstream code. Of course, it requires some
"coding discipline" to follow this pattern... Also, you can shoot
themselves into the foot if they want with the config object pattern,
too: if you get a `KStream` passed in, you can skip repartitioning via
`selectKey((k,v) -> k, Config.markAsPartitioned())`. -- Thus, I still
slightly prefer to add `markAsPartitioned()` as an operator.

(Maybe we should have expose a `PartitionedKStream` as first class
object to begin with... Hard to introduce now I guess...)


The concern about IQ is interesting -- I did not realize this impact.
Thanks for bringing it up.


a repartition would be a no-op, ie that the stream (and its partitioning)
would be the same
whether or not a repartition is inserted. For this to be true, it then has
to be the case that

Partitioner.partition(key) == Partitioner.partition(map(key))


@Sophie: I don't think this statement is correct. A `markAsPartition()`
only means, that the existing partitioning ensure that all messages of
the same new key are still in the same partition. Ie, it cannot happen
that two new keys (that are the same) are in a different partition.

However, if you would physically repartitiong on the new key using the
same hash-function as for the old key, there is no guarantee that the
same partitions would be picked... And that is why IQ breaks downstream.

Btw: using `markAsPartitioned()` could also be an issue for joins for
the same reason... I want to call out, that the Jira tickets that did
raise the concern about unnecessary repartitioning are about downstream
aggregations though...

Last but not least: we actually have a similar situation for
windowed-aggregations: The result of a window aggregation is partitioned
by the "plain 

Jenkins build is unstable: Kafka » Kafka Branch Builder » trunk #399

2021-08-09 Thread Apache Jenkins Server
See 




Re: [VOTE] KIP-740: Use TaskId instead of String for the taskId field in TaskMetadata

2021-08-09 Thread Walker Carlson
I am making a minor update to the KIP renaming the TaskId#namedTopology
method to TaskId#topologyName as it returns a string for the name not an
object. This has approval to make it into 3.0 already.
https://github.com/apache/kafka/pull/11192

best,
Walker

On Thu, May 20, 2021 at 4:58 PM Sophie Blee-Goldman
 wrote:

> Thanks Bruno, fixed. Definitely a leftover from one of the many iterations
> of this KIP.
>
> Guozhang,
> Thanks for pointing out the change in TaskMetadata constructor, I
> definitely agree to
> just swap out the constructor since that's not really the useful part of
> this API. I'm more
> on the fence when it comes to the taskId() getter -- personally of course I
> would prefer
> to just change it directly than go through this deprecation cycle, but
> unlike the constructor
> it seems likely that some users are/have been relying on the taskId()
> method.
>
> I think we can conclude the voting on this KIP at last, with four +1
> (binding) votes from
> Guozhang, John, Bruno, and myself, and one +1 (non-binding) from Walker.
>
> Appreciate the discussion and the questions it has raised. Though I was not
> expecting this
> to be so "complicated", I feel good that we'll be leaving the code and API
> in a better place
> and opened the door for potential further improvements to come.
>
> -Sophie
>
> On Thu, May 20, 2021 at 8:00 AM John Roesler  wrote:
>
> > Thanks for the updates, Sophie,
> >
> > I'm +1 (binding)
> >
> > -John
> >
> > On Thu, 2021-05-20 at 12:54 +0200, Bruno Cadonna wrote:
> > > Thanks for the KIP, Sophie!
> > >
> > > +1 (binding)
> > >
> > > Note:
> > > The sentence in the KIP seems to need some corrections:
> > >
> > > "To migrate from the String to TaskIdInfo in TaskMetadata, we'll need
> to
> > > deprecate the existing taskId() getter method and add a TaskId() getter
> > > in its place."
> > >
> > > I guess you wanted to write:
> > >
> > > "To migrate from the String to *TaskId* in TaskMetadata, we'll need to
> > > deprecate the existing taskId() getter method and add a *getTaskId()*
> > > getter in its place."
> > >
> > >
> > > Best,
> > > Bruno
> > >
> > > On 20.05.21 08:18, Guozhang Wang wrote:
> > > > A quick note: since we changed the constructor of TaskMetadata as
> well
> > in
> > > > the PR, we'd need to add that in the KIP wiki as well. Personally I
> > think
> > > > it is okay to just replace the constructor as you did in the PR
> rather
> > than
> > > > adding/deprecating --- I would even advocate for replacing the
> `taskId`
> > > > function with the new return type without introducing a new one with
> > > > different name, but I knew since this is not favored by most people
> :).
> > > >
> > > > On Wed, May 19, 2021 at 11:01 PM Guozhang Wang 
> > wrote:
> > > >
> > > > > Thanks Sophie, I like the current proposal better compared to
> adding
> > a new
> > > > > TaskInfo class. +1 !
> > > > >
> > > > > Guozhang
> > > > >
> > > > > On Wed, May 19, 2021 at 4:58 PM Sophie Blee-Goldman
> > > > >  wrote:
> > > > >
> > > > > > Just a friendly ping to please check out the finalized proposal
> of
> > the KIP
> > > > > > and (re)cast your votes
> > > > > >
> > > > > > Thanks!
> > > > > > Sophie
> > > > > >
> > > > > > On Sun, May 16, 2021 at 7:28 PM Sophie Blee-Goldman <
> > sop...@confluent.io>
> > > > > > wrote:
> > > > > >
> > > > > > > Thanks John. I have moved the discussion over to a [DISCUSS]
> > thread,
> > > > > > where
> > > > > > > it should have been taking place all
> > > > > > > along. I'll officially kick off the vote again, but since this
> > KIP has
> > > > > > > been through a significant overhauled since it's initial
> > > > > > > proposal, the previous votes cast will be invalidated. Please
> > make a
> > > > > > pass
> > > > > > > on the latest KIP and (re)cast your vote.
> > > > > > >
> > > > > > > If you have any concerns or comments beyond just small
> > questions, please
> > > > > > > take them to the discussion thread.
> > > > > > >
> > > > > > > Thanks!
> > > > > > > Sophie
> > > > > > >
> > > > > > > On Fri, May 14, 2021 at 10:12 AM John Roesler <
> > vvcep...@apache.org>
> > > > > > wrote:
> > > > > > >
> > > > > > > > Thanks for these updates, Sophie,
> > > > > > > >
> > > > > > > > Unfortunately, I have some minor suggestions:
> > > > > > > >
> > > > > > > > 1. "Topic Group" is a vestigial term from the early days of
> > > > > > > > the codebase. We call a "topic group" a "subtopology" in the
> > > > > > > > public interface now (although "topic group" is still used
> > > > > > > > internally some places). For user-facing consistency, we
> > > > > > > > should also use "subtopologyId" in your proposal.
> > > > > > > >
> > > > > > > > 2. I'm wondering if it's really necessary to introduce this
> > > > > > > > interface at all. I think your motivation is to be able to
> > > > > > > > get the subtopologyId and partition via TaskMetadata, right?
> > > > > > > > Why not just add those methods to TaskMetadata? Stepping
> > > > > > > > back, the 

[jira] [Created] (KAFKA-13183) Dropping nul key/value records upstream to repartiton topic not tracked via metrics

2021-08-09 Thread Matthias J. Sax (Jira)
Matthias J. Sax created KAFKA-13183:
---

 Summary: Dropping nul key/value records upstream to repartiton 
topic not tracked via metrics
 Key: KAFKA-13183
 URL: https://issues.apache.org/jira/browse/KAFKA-13183
 Project: Kafka
  Issue Type: Bug
  Components: streams
Reporter: Matthias J. Sax


For joins and aggregation, we consider records with null key or value as 
invalid, and drop them. Inside the aggregate and join processors, we record 
dropped record with a corresponding metric (cf `droppedRecrodSensor`).

However, we also apply an upstream optimization if we need to repartition data. 
As we know that the downstream aggregation / join will drop those records 
anyway, we drop them _before_ we write them into the repartition topic (we 
still need the drop logic in the processor for the case we don't have a 
repartition topic).

We add a `KStreamFilter` (cf `KStreamImpl#createRepartiitonSource()`) upstream 
but this filter does not update the corresponding metric to record dropped 
records.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Re: [DISCUSS] KIP-764 Configurable backlog size for creating Acceptor

2021-08-09 Thread David Mao
Hi Haruki,

I think it makes sense to have this as a configurable property. I think we
should name the property socket.listen.backlog.size for better clarity on
what the property configures. Besides that, the proposal looks good to me.

David

On Wed, Jul 28, 2021 at 8:09 AM Haruki Okada  wrote:

> Hi, Kafka.
>
> Does anyone have any thoughts or suggestions about the KIP?
> If there seems to be no, I would like to start a VOTE later.
>
>
> Thanks,
>
>
> 2021年7月22日(木) 16:17 Haruki Okada :
>
> > Hi, Kafka.
> >
> > I proposed KIP-764, which tries to add new KafkaConfig to adjust
> > Acceptor's backlog size.
> > As described in the KIP and the ticket KAFKA-9648, currently backlog size
> > is fixed value (50) and it may not be enough to handle incoming
> connections
> > from massive clients.
> >
> > So we would like to make it configurable.
> >
> >
> >
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-764%3A+Configurable+backlog+size+for+creating+Acceptor
> >
> > --
> > 
> > Okada Haruki
> > ocadar...@gmail.com
> > 
> >
>
>
> --
> 
> Okada Haruki
> ocadar...@gmail.com
> 
>


[jira] [Created] (KAFKA-13182) Input to AbstractFetchetManager::addFetcherForPartition could be simplified

2021-08-09 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13182:
--

 Summary: Input to AbstractFetchetManager::addFetcherForPartition 
could be simplified
 Key: KAFKA-13182
 URL: https://issues.apache.org/jira/browse/KAFKA-13182
 Project: Kafka
  Issue Type: Improvement
  Components: replication
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


The input to the addFetcherForPartition method in AbstractFetcherManager 
includes more information than it needs. The fetcher manager only needs the 
leader id.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13181) ReplicaManager should start fetchers on UnfencedBrokerRecords

2021-08-09 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13181:
--

 Summary: ReplicaManager should start fetchers on 
UnfencedBrokerRecords
 Key: KAFKA-13181
 URL: https://issues.apache.org/jira/browse/KAFKA-13181
 Project: Kafka
  Issue Type: Sub-task
  Components: kraft, replication
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


The Kraft ReplicaManager starts fetching from the leader if it is a follower 
and there is an endpoint for the leader.

Need to improve the ReplicaManager to also start fetching when the leader 
registers and gets unfenced.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13180) Data Distribution among partitions not working as Expected

2021-08-09 Thread Suriya Vijayaraghavan (Jira)
Suriya Vijayaraghavan created KAFKA-13180:
-

 Summary: Data Distribution among partitions not working as Expected
 Key: KAFKA-13180
 URL: https://issues.apache.org/jira/browse/KAFKA-13180
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 2.8.0
Reporter: Suriya Vijayaraghavan


Hi team, we are facing a weird issue. not sure if anyone else faced this same. 
But we are able to identify the flow.

Issue
 Using RoundiRobin partitioner with even number of partitions n, resulting in 
always produce to only n/2 number of partitions

Is Reproducible: yes

Scenario: For a Kafka topic, we have 6 partitions (0,1,2,3,4,5). We are trying 
to produce to a topic with RoundRobin partitioner.

The RoundRobin partitioner is working based on the index of an ArrayList of 
partition info. For our case lest assume the order of the partitions is 
populated as below in the array list.

{1,2,3,4,5,0}

Expected flow: Even distribution to 6 partitions

How it worked: Data was produced only to partition 2,4,0.

Why:
 On debugging further with the producer flow, we noticed below highlighted 
method in doSend method of KafkaProducer.
{quote}int partition = *partition*(record, serializedKey, serializedValue, 
cluster);
 tp = new TopicPartition(record.topic(), partition);
 .
 RecordAccumulator.RecordAppendResult result = accumulator.append(tp, 
timestamp, serializedKey,
 serializedValue, headers, interceptCallback, remainingWaitMs, *true*, nowMs);
 if (result.abortForNewBatch) {
 int prevPartition = partition;
 partitioner.onNewBatch(record.topic(), cluster, prevPartition);
 partition = *partition*(record, serializedKey, serializedValue, cluster);
 tp = new TopicPartition(record.topic(), partition);
 .
 result = accumulator.append(tp, timestamp, serializedKey,
 serializedValue, headers, interceptCallback, remainingWaitMs, *false*, nowMs);
{quote}
here, in the accumulator.append, true is passed for abortOnNewBatch. The Deque 
that is derived in the RecordAccumulator.append method will always be empty on 
the first message too. Which will try to create a new batch.

And for the new batch, a new TopicPartition Object is being created, which will 
have partition 2. And in this flow, the abortOnNewBatch is passed as false, so 
the record will get added in the DeQueue for this topicpartition.

How ever this will get distributed properly if the total number of partitions 
are odd, as the first record is getting addition will only succed when the 
abordOnNewbatch is passed as false (lets say it as second invoke).

the order of the invoke will be as follows for an even number of odd number of 
partitions and even.

ODD: \{1,2,3,4,0}
 Iteration set untill all partitions gets populated: 
 1 - 2
 3 - 4
 0 - 1
 2 - 3
 4 - 0

Dequeue populated partitions = \{2,4,1,3,0}

EVEN: \{1,2,3,4,5,0}

Iteration set untill all partitions gets populated: 
 1 - 2
 3 - 4
 5 - 0
 1 - 2
 3 - 4
 5 - 0
 1 - 2
 3 - 4
5 - 0.

Dequeue populated partitions = \{2,4,0}

will go on continuosly as all partitions will never be initated. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #398

2021-08-09 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 489388 lines...]
[Pipeline] // withEnv
[Pipeline] }
[Pipeline] // withEnv
[Pipeline] }
[Pipeline] // withEnv
[Pipeline] }
[Pipeline] // node
[Pipeline] }
[Pipeline] // timestamps
[Pipeline] }
[Pipeline] // timeout
[Pipeline] }
[Pipeline] // stage
[Pipeline] }
[2021-08-09T12:08:43.624Z] 
[2021-08-09T12:08:43.624Z] GssapiAuthenticationTest > testRequestIsAReplay() 
PASSED
[2021-08-09T12:08:43.624Z] 
[2021-08-09T12:08:43.624Z] GssapiAuthenticationTest > 
testServerAuthenticationFailure() STARTED
[2021-08-09T12:08:43.807Z] 
[2021-08-09T12:08:43.807Z] TopicCommandIntegrationTest > 
testDescribeUnderReplicatedPartitionsWhenReassignmentIsInProgress() PASSED
[2021-08-09T12:08:43.807Z] 
[2021-08-09T12:08:43.807Z] TopicCommandIntegrationTest > 
testCreateWithNegativePartitionCount() STARTED
[2021-08-09T12:08:47.906Z] 
[2021-08-09T12:08:47.906Z] TopicCommandIntegrationTest > 
testCreateWithNegativePartitionCount() PASSED
[2021-08-09T12:08:47.906Z] 
[2021-08-09T12:08:47.906Z] TopicCommandIntegrationTest > 
testAlterWhenTopicDoesntExist() STARTED
[2021-08-09T12:08:51.659Z] 
[2021-08-09T12:08:51.659Z] TopicCommandIntegrationTest > 
testAlterWhenTopicDoesntExist() PASSED
[2021-08-09T12:08:51.659Z] 
[2021-08-09T12:08:51.659Z] TopicCommandIntegrationTest > 
testCreateAlterTopicWithRackAware() STARTED
[2021-08-09T12:08:58.093Z] 
[2021-08-09T12:08:58.093Z] TopicCommandIntegrationTest > 
testCreateAlterTopicWithRackAware() PASSED
[2021-08-09T12:08:58.093Z] 
[2021-08-09T12:08:58.093Z] TopicCommandIntegrationTest > 
testListTopicsWithIncludeList() STARTED
[2021-08-09T12:09:02.273Z] 
[2021-08-09T12:09:02.273Z] GssapiAuthenticationTest > 
testServerAuthenticationFailure() PASSED
[2021-08-09T12:09:02.273Z] 
[2021-08-09T12:09:02.273Z] GssapiAuthenticationTest > testReLogin() STARTED
[2021-08-09T12:09:03.093Z] 
[2021-08-09T12:09:03.093Z] TopicCommandIntegrationTest > 
testListTopicsWithIncludeList() PASSED
[2021-08-09T12:09:03.093Z] 
[2021-08-09T12:09:03.093Z] TopicCommandIntegrationTest > testTopicDeletion() 
STARTED
[2021-08-09T12:09:09.095Z] 
[2021-08-09T12:09:09.095Z] TopicCommandIntegrationTest > testTopicDeletion() 
PASSED
[2021-08-09T12:09:09.095Z] 
[2021-08-09T12:09:09.095Z] TopicCommandIntegrationTest > 
testCreateWithDefaults() STARTED
[2021-08-09T12:09:13.453Z] 
[2021-08-09T12:09:13.453Z] TopicCommandIntegrationTest > 
testCreateWithDefaults() PASSED
[2021-08-09T12:09:13.453Z] 
[2021-08-09T12:09:13.453Z] TopicCommandIntegrationTest > 
testDescribeReportOverriddenConfigs() STARTED
[2021-08-09T12:09:17.527Z] 
[2021-08-09T12:09:17.527Z] TopicCommandIntegrationTest > 
testDescribeReportOverriddenConfigs() PASSED
[2021-08-09T12:09:17.527Z] 
[2021-08-09T12:09:17.527Z] TopicCommandIntegrationTest > 
testDescribeWhenTopicDoesntExist() STARTED
[2021-08-09T12:09:21.116Z] 
[2021-08-09T12:09:21.116Z] TopicCommandIntegrationTest > 
testDescribeWhenTopicDoesntExist() PASSED
[2021-08-09T12:09:21.116Z] 
[2021-08-09T12:09:21.116Z] TopicCommandIntegrationTest > 
testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized() STARTED
[2021-08-09T12:09:21.870Z] 
[2021-08-09T12:09:21.870Z] GssapiAuthenticationTest > testReLogin() PASSED
[2021-08-09T12:09:22.905Z] 
[2021-08-09T12:09:22.905Z] Deprecated Gradle features were used in this build, 
making it incompatible with Gradle 8.0.
[2021-08-09T12:09:22.905Z] 
[2021-08-09T12:09:22.905Z] You can use '--warning-mode all' to show the 
individual deprecation warnings and determine if they come from your own 
scripts or plugins.
[2021-08-09T12:09:22.905Z] 
[2021-08-09T12:09:22.905Z] See 
https://docs.gradle.org/7.1.1/userguide/command_line_interface.html#sec:command_line_warnings
[2021-08-09T12:09:22.905Z] 
[2021-08-09T12:09:22.905Z] BUILD SUCCESSFUL in 1h 56m 9s
[2021-08-09T12:09:22.905Z] 202 actionable tasks: 109 executed, 93 up-to-date
[2021-08-09T12:09:22.905Z] 
[2021-08-09T12:09:22.905Z] See the profiling report at: 
file:///home/jenkins/workspace/Kafka_kafka_trunk/build/reports/profile/profile-2021-08-09-10-13-17.html
[2021-08-09T12:09:22.905Z] A fine-grained performance profile is available: use 
the --scan option.
[Pipeline] junit
[2021-08-09T12:09:23.766Z] Recording test results
[2021-08-09T12:09:26.078Z] 
[2021-08-09T12:09:26.078Z] TopicCommandIntegrationTest > 
testDescribeDoesNotFailWhenListingReassignmentIsUnauthorized() PASSED
[2021-08-09T12:09:26.078Z] 
[2021-08-09T12:09:26.078Z] TopicCommandIntegrationTest > 
testAlterAssignmentWithMoreAssignmentThanPartitions() STARTED
[2021-08-09T12:09:33.788Z] 
[2021-08-09T12:09:33.788Z] TopicCommandIntegrationTest > 
testAlterAssignmentWithMoreAssignmentThanPartitions() PASSED
[2021-08-09T12:09:33.788Z] 
[2021-08-09T12:09:33.788Z] TopicCommandIntegrationTest > 
testDescribeWhenTopicDoesntExistWithIfExists() STARTED
[2021-08-09T12:09:34.848Z] [Checks API] No suitable checks publisher found.

Re: High kafka latency on simple benchmark

2021-08-09 Thread Piotr Smolinski
Having a brief look on your repo I have noticed at least two major API
usage issues:

1. When you issue Producer#send you get a future object. It is up to the
producer to choose the correct time to send.
If you really need to send the messages one by one you should execute
Producer#flush and then wait for future
completion.

2. In the consumer loop additional wait (Thread.onSpinWait) is not needed.
You should just continue polling. The poll
call should provide the time, which is max time to wait for messages. Poll
under the hood does quite a lot of activities.

Committing after each record is also not necessary, especially if you focus
on high throughput.

On the consumer side you also set high fetch.min.bytes. For low latency you
should set it to 1, otherwise you force
the consumer to wait for data. Similarly you may set max.poll.records to 1,
but this one is a local client setting.

For tuning have a look on this whitepaper:
https://www.confluent.io/white-paper/optimizing-your-apache-kafka-deployment/

HTH,
Piotr



On Sun, Aug 8, 2021 at 11:30 PM Виталий Ромашкин 
wrote:

>
> Hi Devs,
>
> Currently, I am bench-marking different transports.
> The first one is Kafka.
> I created a repo in my GitHub —
> https://github.com/rvit34/transport-benchmark
> The result for Kafka is not so good. For RPS 25K and higher latency is
> about 1second and higher.
> Maybe I'm doing something completely wrong but If I change transport from
> Kafka to Aeron my max latency is always under 100ms for any workload (100K
> RPS and higher).
> So, might somebody check it out?
>
>
> Best Regards, Vitaly.



-- 

*Mit freundlichen Grüßen / Kind regards *


Piotr Smolinski

Consulting Engineer, Professional Services EMEA

piotr.smolin...@confluent.io | +49 (151) 267-114-23


Follow us:  Blog  • Slack
 • Twitter








Re: [DISCUSS] KIP-761: Add total blocked time metric to streams

2021-08-09 Thread Bruno Cadonna

Hi,

With "group", I actually meant the "type" tag. A thread-level metrics in 
Streams has the following tag "type=stream-thread-metrics" which we also 
call "group" in the code.
I realized now that I mentioned "type" and "group" in my previous e-mail 
which are synonyms for the same concept.


For example, the blocked-time-total metric in Rohan's KIP should be 
defined as:


blocked-time-total

tags:
type = stream-thread-metrics
thread-id = [stream thread ID]

level: INFO

description: the total time the Kafka Streams thread spent blocked on Kafka.

Best,
Bruno


On 27.07.21 10:13, Sophie Blee-Goldman wrote:

Thanks for the clarifications, that all makes sense.

I'm ready to vote on the KIP, but can you just update the KIP first to
address Bruno's feedback? Ie just fix the tags and fill in the missing
fields.

For example it sounds like these would be thread-level metrics. You should
be able to figure out what the values should be from the KIP-444 doc,
there's a chart with the type and tags for all thread-level metrics.

Not 100% sure what Bruno meant by "group" but my guess would be whether
it's INFO/DEBUG/TRACE. This is probably one of the most important
things to include in a KIP that's introducing new metrics: how big is the
potential performance impact of recording these metrics? How big is the
intended audience, would these be useful to almost everyone or are they
more "niche"?

It sounds like maybe DEBUG would be most appropriate here -- WDYT?

-Sophie

On Thu, Jul 22, 2021 at 9:01 AM Bruno Cadonna  wrote:


Hi Rohan,

Thank you for the KIP!

I agree that the KIP is well-motivated.

What is not very clear is the metadata like type, group, and tags of the
metrics. For example, there is not application-id tag in Streams and
there is also no producer-id tag. The clients, i.e., producer, admin,
consumer, and also Streams have a client-id tag, that corresponds to the
producer-id, consumer-id, etc you use in the KIP.

For examples of metadata used in Streams you can look at the following
KIPs:

-

https://cwiki.apache.org/confluence/display/KAFKA/KIP-444%3A+Augment+metrics+for+Kafka+Streams
-

https://cwiki.apache.org/confluence/display/KAFKA/KIP-471%3A+Expose+RocksDB+Metrics+in+Kafka+Streams
-

https://cwiki.apache.org/confluence/display/KAFKA/KIP-607%3A+Add+Metrics+to+Kafka+Streams+to+Report+Properties+of+RocksDB
-

https://cwiki.apache.org/confluence/display/KAFKA/KIP-613%3A+Add+end-to-end+latency+metrics+to+Streams


Best,
Bruno

On 22.07.21 09:42, Rohan Desai wrote:

re sophie:

The intent here was to include all blocked time (not just `RUNNING`). The
caller can window the total blocked time themselves, and that can be
compared with a timeseries of the state to understand the ratio in
different states. I'll update the KIP to include `committed`. The admin

API

calls should be accounted for by the admin client iotime/iowaittime
metrics.

On Tue, Jul 20, 2021 at 11:49 PM Rohan Desai 
wrote:


I remember now that we moved the round-trip PID's txn completion logic

into
init-transaction and commit/abort-transaction. So I think we'd count

time

as in StreamsProducer#initTransaction as well (admittedly it is in most
cases a one-time thing).

Makes sense - I'll update the KIP

On Tue, Jul 20, 2021 at 11:48 PM Rohan Desai 
wrote:




I had a question - it seems like from the descriptionsof

`txn-commit-time-total` and `offset-commit-time-total` that they

measure

similar processes for ALOS and EOS, but only `txn-commit-time-total` is
included in `blocked-time-total`. Why isn't `offset-commit-time-total`

also

included?

I've updated the KIP to include it.


Aside from `flush-time-total`, `txn-commit-time-total` and

`offset-commit-time-total`, which will be producer/consumer client
metrics,
the rest of the metrics will be streams metrics that will be thread

level,

is that right?

Based on the feedback from Guozhang, I've updated the KIP to reflect

that

the lower-level metrics are all client metrics that are then summed to
compute the blocked time metric, which is a Streams metric.

On Tue, Jul 20, 2021 at 11:58 AM Rohan Desai 
wrote:


Similarly, I think "txn-commit-time-total" and

"offset-commit-time-total" may better be inside producer and consumer
clients respectively.

I agree for offset-commit-time-total. For txn-commit-time-total I'm
proposing we measure `StreamsProducer.commitTransaction`, which wraps
multiple producer calls (sendOffsets, commitTransaction)


For "txn-commit-time-total" specifically, besides

producer.commitTxn.
other txn-related calls may also be blocking, including
producer.beginTxn/abortTxn, I saw you mentioned "txn-begin-time-total"
later in the doc, but did not include it as a separate metric, and
similarly, should we have a `txn-abort-time-total` as well? If yes,
could
you update the KIP page accordingly.

`beginTransaction` is not blocking - I meant to remove that from that
doc. I'll add something for abort.

On Mon, Jul 19, 2021 at 11:55 PM Rohan Desai 


wrote:



Re: Handling retriable exceptions during Connect source task start

2021-08-09 Thread Gunnar Morling
Hi,

To ask slightly differently: would there be interest in a pull request for
implementing retries, in case RetriableException is thrown from the
Task::start() method?

Thanks,

--Gunnar


Am Do., 5. Aug. 2021 um 22:27 Uhr schrieb Sergei Morozov :

> Hi,
>
> I'm trying to address an issue in Debezium (DBZ-3823
> ) where a source connector task
> cannot recover from a retriable exception.
>
> The root cause is that the task interacts with the source database during
> SourceTask#start but Kafka Connect doesn't handle retriable exceptions
> thrown at this stage as retriable. KIP-298
> <
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-298%3A+Error+Handling+in+Connect
> >
> that
> originally introduced handling of retriable exception doesn't describe
> handling task start exceptions, so it's unclear to me whether those aren't
> allowed by design or it was just out of the scope of the KIP.
>
> My current working solution
>  relies
> on the internal Debezium implementation of the task restart which
> introduces certain risks (the details are in the PR description).
>
> The question is: are retriable exceptions during start disallowed by
> design, and the task must not throw retriable exceptions during start, or
> it's just currently not supported by the Connect framework and I just need
> to implement proper error handling in the connector?
>
> Thanks!
>
> --
> Sergei Morozov
>


[jira] [Created] (KAFKA-13179) High kafka latency on simple benchmark

2021-08-09 Thread Vitaly Romashkin (Jira)
Vitaly Romashkin created KAFKA-13179:


 Summary: High kafka latency on simple benchmark
 Key: KAFKA-13179
 URL: https://issues.apache.org/jira/browse/KAFKA-13179
 Project: Kafka
  Issue Type: Test
Affects Versions: 2.8.0
 Environment: Ubuntu 18.04.5 LTS x86_64 4.15.0-153-generic
Intel(R) Core(TM) i7-4770 CPU @ 3.40GHz (8 logical cores)
16GB RAM (swap is off)
Samsung SSD 850 Evo 512GB (btrfs)
openjdk version "11.0.5-BellSoft" 2019-10-15
OpenJDK Runtime Environment (build 11.0.5-BellSoft+11)
OpenJDK 64-Bit Server VM (build 11.0.5-BellSoft+11, mixed mode)
kafka_2.13-2.8.0
zookeeper v3.5.9
Reporter: Vitaly Romashkin


Currently, I am bench-marking different transports.
 The first one is Kafka.
 I created a repo in my GitHub — [https://github.com/rvit34/transport-benchmark]
 The result for Kafka is not so good. For RPS 25K and higher, latency is about 
1second and higher.
 Maybe I'm doing something completely wrong but If I change transport from 
Kafka to Aeron my max latency is always under 100ms for any workload (100K RPS 
and higher).
 So, might somebody check it out?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-13177) partition failures and fewer shrink but a lot of isr expansions with increased num.replica.fetchers in kafka brokers

2021-08-09 Thread kaushik srinivas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13177?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

kaushik srinivas resolved KAFKA-13177.
--
Resolution: Not A Bug

> partition failures and fewer shrink but a lot of isr expansions with 
> increased num.replica.fetchers in kafka brokers
> 
>
> Key: KAFKA-13177
> URL: https://issues.apache.org/jira/browse/KAFKA-13177
> Project: Kafka
>  Issue Type: Bug
>Reporter: kaushik srinivas
>Assignee: kaushik srinivas
>Priority: Major
>
> Installing 3 node kafka broker cluster (4 core cpu and 4Gi memory on k8s)
> topics : 15, partitions each : 15 replication factor 3, min.insync.replicas  
> : 2
> producers running with acks : all
> Initially the num.replica.fetchers was set to 1 (default) and we observed 
> very frequent ISR shrinks and expansions. So the setups were tuned with a 
> higher value of 4. 
> Once after this change was done, we see below behavior and warning msgs in 
> broker logs
>  # Over a period of 2 days, there are around 10 shrinks corresponding to 10 
> partitions, but around 700 ISR expansions corresponding to almost all 
> partitions in the cluster(approx 50 to 60 partitions).
>  # we see frequent warn msg of partitions being marked as failure in the same 
> time span. Below is the trace --> {"type":"log", "host":"ww", 
> "level":"WARN", "neid":"kafka-ww", "system":"kafka", 
> "time":"2021-08-03T20:09:15.340", "timezone":"UTC", 
> "log":{"message":"ReplicaFetcherThread-2-1003 - 
> kafka.server.ReplicaFetcherThread - *[ReplicaFetcher replicaId=1001, 
> leaderId=1003, fetcherId=2] Partition test-16 marked as failed"}}*
>  
> We see the above behavior continuously after increasing the 
> num.replica.fetchers to 4 from 1. We did increase this to improve the 
> replication performance and hence reduce the ISR shrinks.
> But we see this strange behavior after the change. What would the above trace 
> indicate and is marking partitions as failed just a WARN msgs and handled by 
> kafka or is it something to worry about ? 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)