For completeness I have also tested 2.4.0 Broker with 2.4.0 Client. All works 
correctly.  Unfortunately as we are on AWS MSK we don’t have the option to use 
2.4.0 for the Brokers.

So now I guess the question changes to what combo is best for us and will it 
avoid UNKNOWN_PRODUCER_ID problems?

We can choose 2.2.1 or 2.3.1 for the Broker (AWS recommend 2.2.1 although don’t 
state why).  Based on the experiences below, I would then go with the 
corresponding 2.2.2 or 2.3.1 Client version.

Which combo would people recommend?

> On 9/03/2020, at 12:03 PM, James Olsen <ja...@inaseq.com> wrote:
> 
> Jamie,
> 
> I’ve just tested with 2.3.1 Broker and 2.3.1 Client and it works correctly.  
> So with that setup it does deliver the batch as soon as any partition has 
> data.  This is what we would expect from the Kafka docs.
> 
> So it looks like an issue with the 2.4.0 Client.  This is concerning as I 
> wanted the fix for https://issues.apache.org/jira/browse/KAFKA-7190 as we may 
> have some very quiet Topics.    2.3.x does have some handling for this as 
> implied by https://issues.apache.org/jira/browse/KAFKA-8483 but I’m not sure 
> it is as complete.
> 
> Regards, James.
> 
> On 9/03/2020, at 11:54 AM, Jamie 
> <jamied...@aol.co.uk<mailto:jamied...@aol.co.uk>> wrote:
> 
> Hi James,
> 
> My understanding is that consumers will only ever have 1 in flight request to 
> each broker that has leader partitions of topics that it is subscribed to. 
> The fetch requests will ask for records for all leader partitions on the 
> broker so if the consumer is consuming from more than one partition on a 
> broker then they will be batched into one request. I assume this means if 
> there are some partitions with no data available then the request will wait 
> for fetch.max.wait.ms even if some of the partitions have more than 
> fetch.min.bytes data available to be read instantly?
> 
> Thanks,
> 
> Jamie
> 
> Sent from AOL Mobile Mail
> Get the new AOL app: mail.mobile.aol.com<http://mail.mobile.aol.com/>
> 
> On Sunday, 8 March 2020, James Olsen 
> <ja...@inaseq.com<mailto:ja...@inaseq.com>> wrote:
> 
> Using 2.3.1 Brokers makes things worse.  There are now 2 fetch.max.wait.ms 
> delays before messages are delivered even though they were available at the 
> beginning.
> 
> 2020-03-09 11:40:23,878 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 
> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Fetch 
> READ_UNCOMMITTED at offset 28 for partition Ledger-1 returned fetch data 
> (error=NONE, highWaterMark=29, lastStableOffset = 29, logStartOffset = 0, 
> preferredReadReplica = absent, abortedTransactions = null, 
> recordsSizeInBytes=280)
> 2020-03-09 11:40:23,878 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 
> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] 
> Ignoring fetched records for partition Ledger-1 since it no longer has valid 
> position
> 2020-03-09 11:40:23,878 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 
> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
> READ_UNCOMMITTED fetch request for partition Ledger-0 at position 
> FetchPosition{offset=0, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
> 2020-03-09 11:40:23,878 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 
> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Sending 
> READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(Ledger-1), 
> implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: null)
> 2020-03-09 11:40:24,382 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 
> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
> READ_UNCOMMITTED fetch request for partition Ledger-0 at position 
> FetchPosition{offset=0, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
> 2020-03-09 11:40:24,382 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 
> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Sending 
> READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(), 
> implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: null)
> 2020-03-09 11:40:24,382 DEBUG 
> [org.apache.kafka.clients.consumer.internals.OffsetsForLeaderEpochClient] 
> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] 
> Handling OffsetsForLeaderEpoch response for Ledger-1. Got offset 29 for epoch > 0
> 2020-03-09 11:40:24,885 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 
> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
> READ_UNCOMMITTED fetch request for partition Ledger-0 at position 
> FetchPosition{offset=0, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
> 2020-03-09 11:40:24,885 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 
> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
> READ_UNCOMMITTED fetch request for partition Ledger-1 at position 
> FetchPosition{offset=28, offsetEpoch=Optional[0], 
> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
> 2020-03-09 11:40:24,885 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 
> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Sending 
> READ_UNCOMMITTED IncrementalFetchRequest(toSend=(Ledger-1), toForget=(), 
> implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: null)
> 2020-03-09 11:40:24,887 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 
> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Fetch 
> READ_UNCOMMITTED at offset 28 for partition Ledger-1 returned fetch data 
> (error=NONE, highWaterMark=29, lastStableOffset = 29, logStartOffset = 0, 
> preferredReadReplica = absent, abortedTransactions = null, 
> recordsSizeInBytes=280)
> 2020-03-09 11:40:24,889 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 
> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
> READ_UNCOMMITTED fetch request for partition Ledger-0 at position 
> FetchPosition{offset=0, offsetEpoch=Optional.empty, 
> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
> 2020-03-09 11:40:24,889 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 
> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Added 
> READ_UNCOMMITTED fetch request for partition Ledger-1 at position 
> FetchPosition{offset=29, offsetEpoch=Optional[0], 
> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
> 2020-03-09 11:40:24,889 DEBUG 
> [org.apache.kafka.clients.consumer.internals.Fetcher] 
> 'EE-ManagedThreadFactory-default-Thread-2' [Consumer 
> clientId=consumer-LedgerService-group-1, groupId=LedgerService-group] Sending 
> READ_UNCOMMITTED IncrementalFetchRequest(toSend=(Ledger-1), toForget=(), 
> implied=(Ledger-0)) to broker localhost:9093 (id: 1001 rack: null)
> 
> 
>> On 9/03/2020, at 10:48 AM, James Olsen 
>> <ja...@inaseq.com<mailto:ja...@inaseq.com>> wrote:
>> 
>> Thanks for your response.  Yes the second issue can be mitigated by reducing 
>> the fetch.max.wait.ms although reducing it too far creates excessive CPU 
>> load on the Brokers.  However I've done some further testing and found what 
>> looks like the underlying cause.
>> 
>> In the scenario below the Consumer is consuming from 2 Partitions (MyTopic-0 
>> and MyTopic-1).  There is a cycle of messages being fetched and ignored.  In 
>> each cycle a subsequent fetch to get them again does not occur until after a 
>> complete fetch.max.wait.ms expires.  I suspect this is due initially to the 
>> fact that MyTopic-0 has never had any messages and hence has no epoch and 
>> subsequently is being fetched on it’s own - but being empty results in the 
>> delay.  Someone who knows more about the meaning of "toSend=(), 
>> toForget=(MyTopic-1), implied=(MyTopic-0)” might be able to enlighten things 
>> further.
>> 
>> I can post a more complete log of this if anyone wants to take a look.
>> 
>> I’m going to try Kafka 2.3 Brokers to see if the "Skipping validation …” bit 
>> has any impact.
>> 
>> 2020-03-09 09:46:43,093 DEBUG 
>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Fetch 
>> READ_UNCOMMITTED at offset 40 for partition MyTopic-1 returned fetch data 
>> (error=NONE, highWaterMark=41, lastStableOffset = 41, logStartOffset = 0, 
>> preferredReadReplica = absent, abortedTransactions = null, 
>> recordsSizeInBytes=573)
>> 
>> 2020-03-09 09:46:43,093 DEBUG 
>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Ignoring fetched 
>> records for partition MyTopic-1 since it no longer has valid position
>> 
>> 2020-03-09 09:46:43,093 DEBUG 
>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added 
>> READ_UNCOMMITTED fetch request for partition MyTopic-0 at position 
>> FetchPosition{offset=0, offsetEpoch=Optional.empty, 
>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
>> 
>> 2020-03-09 09:46:43,093 DEBUG 
>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Sending 
>> READ_UNCOMMITTED IncrementalFetchRequest(toSend=(), toForget=(MyTopic-1), 
>> implied=(MyTopic-0)) to broker localhost:9093 (id: 1001 rack: null)
>> 
>> 2020-03-09 09:46:43,095 DEBUG 
>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Skipping 
>> validation of fetch offsets for partitions [MyTopic-1] since the broker does 
>> not support the required protocol version (introduced in Kafka 2.3)
>> 
>> 2020-03-09 09:46:43,597 DEBUG 
>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added 
>> READ_UNCOMMITTED fetch request for partition MyTopic-0 at position 
>> FetchPosition{offset=0, offsetEpoch=Optional.empty, 
>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
>> 
>> 2020-03-09 09:46:43,597 DEBUG 
>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Added 
>> READ_UNCOMMITTED fetch request for partition MyTopic-1 at position 
>> FetchPosition{offset=40, offsetEpoch=Optional[0], 
>> currentLeader=LeaderAndEpoch{leader=localhost:9093 (id: 1001 rack: null), 
>> epoch=-1}} to node localhost:9093 (id: 1001 rack: null)
>> 
>> 2020-03-09 09:46:43,597 DEBUG 
>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Sending 
>> READ_UNCOMMITTED IncrementalFetchRequest(toSend=(MyTopic-1), toForget=(), 
>> implied=(MyTopic-0)) to broker localhost:9093 (id: 1001 rack: null)
>> 
>> 2020-03-09 09:46:43,599 DEBUG 
>> [org.apache.kafka.clients.consumer.internals.Fetcher] 'Thread-2' [Consumer 
>> clientId=consumer-Redacted-group-1, groupId=Redacted-group] Fetch 
>> READ_UNCOMMITTED at offset 40 for partition MyTopic-1 returned fetch data 
>> (error=NONE, highWaterMark=41, lastStableOffset = 41, logStartOffset = 0, 
>> preferredReadReplica = absent, abortedTransactions = null, 
>> recordsSizeInBytes=573)
>> 
>> 
>> On 5/03/2020, at 11:45 PM, M. Manna 
>> <manme...@gmail.com<mailto:manme...@gmail.com><mailto:manme...@gmail.com<mailto:manme...@gmail.com>>>
>>  wrote:
>> 
>> Hi James,
>> 
>> 3 Consumers in a group means you are having 20 partitions per consumer (as
>> per your 60 partition and 1 CGroup setup), 5 means 12. There's nothing
>> special about these numbers as you also noticed.
>> Have you tried setting fetch.max.wait.ms = 0 and see whether that's making
>> a difference for you?
>> 
>> Thanks,
>> 
>> 
>> On Thu, 5 Mar 2020 at 03:43, James Olsen 
>> <ja...@inaseq.com<mailto:ja...@inaseq.com><mailto:ja...@inaseq.com<mailto:ja...@inaseq.com>>>
>>  wrote:
>> 
>> I’m seeing behaviour that I don’t understand when I have Consumers
>> fetching from multiple Partitions from the same Topic.  There are two
>> different conditions arising:
>> 
>> 1. A subset of the Partitions allocated to a given Consumer not being
>> consumed at all.  The Consumer appears healthy, the Thread is running and
>> logging activity and is successfully processing records from some of the
>> Partitions it has been assigned.  I don’t think this is due to the first
>> Partition fetched filling a Batch (KIP-387).  The problem does not occur if
>> we have a particular number of Consumers (3 in this case) but it has failed
>> with a range of other larger values.  I don’t think there is anything
>> special about 3 - it just happens to work OK with that value although it is
>> the same as the Broker and Replica count.  When we tried 6, 5 Consumers
>> were fine but 1 exhibited this issue.
>> 
>> 2. Up to a half second delay between Producer sending and Consumer
>> receiving a message.  This looks suspiciously like the fetch.max.wait.ms=500
>> but we also have fetch.min.bytes=1 so should get messages as soon as
>> something is available.  The only explanation I can think of is if the
>> fetch.max.wait.ms is applied in full to the first Partition checked and
>> it remains empty for the duration.  Then it moves on to a subsequent
>> non-empty Partition and delivers messages from there.
>> 
>> Our environment is AWS MSK (Kafka 2.2.1) and Kafka Java client 2.4.0.
>> 
>> All environments appear healthy and under light load, e.g. clients only
>> operating at a 1-2% CPU, Brokers (3) at 5-10% CPU.  No swap, no crashes,
>> no dead threads etc.
>> 
>> Typical scenario is a Topic with 60 Partitions, 3 Replicas and a single
>> ConsumerGroup with 5 Consumers.  The Partitioning is for semantic purposes
>> with the intention being to add more Consumers as the business grows and
>> load increases.  Some of the Partitions are always empty due to using short
>> string keys and the default Partitioner - we will probably implement a
>> custom Partitioner to achieve better distribution in the near future.
>> 
>> I don’t have access to the detailed JMX metrics yet but am working on that
>> in the hope it will help diagnose.
>> 
>> Thoughts and advice appreciated!
>> 
> 
> 

Reply via email to