[jira] [Updated] (KAFKA-10181) Redirect AlterConfig/IncrementalAlterConfig to the controller
[ https://issues.apache.org/jira/browse/KAFKA-10181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10181: Description: In the new Admin client, the AlterConfig/IncrementalAlterConfig request should be redirected to the active controller. (was: In the new Admin client, the request should always be routed towards the controller.) > Redirect AlterConfig/IncrementalAlterConfig to the controller > - > > Key: KAFKA-10181 > URL: https://issues.apache.org/jira/browse/KAFKA-10181 > Project: Kafka > Issue Type: Sub-task > Components: admin >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.7.0 > > > In the new Admin client, the AlterConfig/IncrementalAlterConfig request > should be redirected to the active controller. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10181) Redirect AlterConfig/IncrementalAlterConfig to the controller
[ https://issues.apache.org/jira/browse/KAFKA-10181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10181: Summary: Redirect AlterConfig/IncrementalAlterConfig to the controller (was: AlterConfig/IncrementalAlterConfig should go to controller) > Redirect AlterConfig/IncrementalAlterConfig to the controller > - > > Key: KAFKA-10181 > URL: https://issues.apache.org/jira/browse/KAFKA-10181 > Project: Kafka > Issue Type: Sub-task > Components: admin >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.7.0 > > > In the new Admin client, the request should always be routed towards the > controller. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-10270) Add a broker to controller channel manager to redirect AlterConfig
[ https://issues.apache.org/jira/browse/KAFKA-10270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen updated KAFKA-10270: Fix Version/s: 2.7.0 > Add a broker to controller channel manager to redirect AlterConfig > -- > > Key: KAFKA-10270 > URL: https://issues.apache.org/jira/browse/KAFKA-10270 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > Fix For: 2.7.0 > > > Per KIP-590 requirement, we need to have a dedicate communication channel > from broker to the controller. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-10270) Add a broker to controller channel manager to redirect AlterConfig
[ https://issues.apache.org/jira/browse/KAFKA-10270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Boyang Chen resolved KAFKA-10270. - Resolution: Fixed > Add a broker to controller channel manager to redirect AlterConfig > -- > > Key: KAFKA-10270 > URL: https://issues.apache.org/jira/browse/KAFKA-10270 > Project: Kafka > Issue Type: Sub-task >Reporter: Boyang Chen >Assignee: Boyang Chen >Priority: Major > > Per KIP-590 requirement, we need to have a dedicate communication channel > from broker to the controller. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10326) Both serializer and deserializer should be able to see the generated client id
Chia-Ping Tsai created KAFKA-10326: -- Summary: Both serializer and deserializer should be able to see the generated client id Key: KAFKA-10326 URL: https://issues.apache.org/jira/browse/KAFKA-10326 Project: Kafka Issue Type: Bug Reporter: Chia-Ping Tsai Assignee: Chia-Ping Tsai Producer and consumer generate client id when users don't define it. the generated client id is passed to all configurable components (for example, metrics reporter) except for serializer/deseriaizer. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10325) Implement KIP-649: Dynamic Client Configuration
Ryan Dielhenn created KAFKA-10325: - Summary: Implement KIP-649: Dynamic Client Configuration Key: KAFKA-10325 URL: https://issues.apache.org/jira/browse/KAFKA-10325 Project: Kafka Issue Type: New Feature Reporter: Ryan Dielhenn Implement KIP-649: Dynamic Client Configuration -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167488#comment-17167488 ] Ismael Juma commented on KAFKA-10324: - Yeah, it's usually ok to have one smaller batch per segment. And in the common case where you are reading from the end of the log, there may not be another segment. Scanning ahead seems ok for down conversion, but it's not clear if it's worth it for the common case (there is a cost to scanning ahead). > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
[ https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167487#comment-17167487 ] John Roesler commented on KAFKA-10307: -- Hey [~bchen225242] , it seems like this was just a misunderstanding. Can we close the ticket? Or have I missed something? > Topology cycles in > KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable > - > > Key: KAFKA-10307 > URL: https://issues.apache.org/jira/browse/KAFKA-10307 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.4.0, 2.5.0, 2.6.0 >Reporter: Boyang Chen >Priority: Major > > We have spotted a cycled topology for the foreign-key join test > *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug > in the algorithm or the test only. Used > [https://zz85.github.io/kafka-streams-viz/] to visualize: > {code:java} > Sub-topology: 0 > Source: KTABLE-SOURCE-19 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 > Source: KTABLE-SOURCE-32 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 > Source: KSTREAM-SOURCE-01 (topics: [table1]) > --> KTABLE-SOURCE-02 > Processor: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: > [table1-STATE-STORE-00]) > --> KTABLE-FK-JOIN-OUTPUT-21 > <-- KTABLE-SOURCE-19 > Processor: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: > [INNER-store1]) > --> KTABLE-FK-JOIN-OUTPUT-34 > <-- KTABLE-SOURCE-32 > Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 > Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2]) > --> KTABLE-TOSTREAM-35 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 > Processor: KTABLE-SOURCE-02 (stores: > [table1-STATE-STORE-00]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 > <-- KSTREAM-SOURCE-01 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: > []) > --> KTABLE-SINK-11 > <-- KTABLE-SOURCE-02 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: > []) > --> KTABLE-SINK-24 > <-- KTABLE-FK-JOIN-OUTPUT-21 > Processor: KTABLE-TOSTREAM-35 (stores: []) > --> KSTREAM-SINK-36 > <-- KTABLE-FK-JOIN-OUTPUT-34 > Sink: KSTREAM-SINK-36 (topic: output-) > <-- KTABLE-TOSTREAM-35 > Sink: KTABLE-SINK-11 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 > Sink: KTABLE-SINK-24 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 Sub-topology: 1 > Source: KSTREAM-SOURCE-04 (topics: [table2]) > --> KTABLE-SOURCE-05 > Source: KTABLE-SOURCE-12 (topics: > [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: > [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 > <-- KTABLE-SOURCE-12 > Processor: KTABLE-SOURCE-05 (stores: > [table2-STATE-STORE-03]) > --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 > <-- KSTREAM-SOURCE-04 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: > [table2-STATE-STORE-03]) > --> KTABLE-SINK-18 > <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 > Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 (stores: > [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13]) > --> KTABLE-SINK-18 > <-- KTABLE-SOURCE-05 > Sink: KTABLE-SINK-18 (topic: > KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic) > <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15, > KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 Sub-topology: 2 > Source: KSTREAM-SOURCE-07 (topics: [table3]) > --> KTABLE-SOURCE-08 >
[jira] [Updated] (KAFKA-3672) Introduce globally consistent checkpoint in Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-3672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler updated KAFKA-3672: Issue Type: New Feature (was: Bug) > Introduce globally consistent checkpoint in Kafka Streams > - > > Key: KAFKA-3672 > URL: https://issues.apache.org/jira/browse/KAFKA-3672 > Project: Kafka > Issue Type: New Feature > Components: streams >Affects Versions: 0.10.0.0 >Reporter: Guozhang Wang >Priority: Major > Labels: user-experience > > This is originate from the idea of rethinking about the checkpoint file > creation condition: > Today the checkpoint file containing the checkpointed offsets is written upon > stream task clean shutdown, and is read and deleted upon stream task > (re-)construction. The rationale is that if upon task re-construction, the > checkpoint file is missing, it indicates that the underlying persistent state > store (rocksDB, for example)'s state may not be consistent with the committed > offsets, and hence we'd better to wipe-out the maybe-broken state storage and > rebuild from the beginning of the offset. > However, we may able to do better than this setting if we can fully control > the persistent store flushing time to be aligned with committing, and hence > as long as we commit, we are always guaranteed to get a clear checkpoint. > This may be generalized to a "global state checkpoint" mechanism in Kafka > Streams, which may also subsume KAFKA-3184 for non persistent stores. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167485#comment-17167485 ] Tommy Becker commented on KAFKA-10324: -- Understood but seems like as your fetch offset approaches the end of a segment you'll get a smaller batch. Just pointing out that scanning ahead to grab as many records as will fit into the max fetch size could be a general, albeit small improvement across the board. > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Resolved] (KAFKA-9210) kafka stream loss data
[ https://issues.apache.org/jira/browse/KAFKA-9210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] John Roesler resolved KAFKA-9210. - Resolution: Fixed > kafka stream loss data > -- > > Key: KAFKA-9210 > URL: https://issues.apache.org/jira/browse/KAFKA-9210 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.1 >Reporter: panpan.liu >Priority: Major > Attachments: app.log, screenshot-1.png > > > kafka broker: 2.0.1 > kafka stream client: 2.1.0 > # two applications run at the same time > # after some days,I stop one application(in k8s) > # The flollowing log occured and I check the data and find that value is > less than what I expected. > {quote}Partitions > [flash-app-xmc-worker-share-store-minute-repartition-1]Partitions > [flash-app-xmc-worker-share-store-minute-repartition-1] for changelog > flash-app-xmc-worker-share-store-minute-changelog-12019-11-19 > 05:50:49.816|WARN > |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|101|stream-thread > [flash-client-xmc-StreamThread-3] Restoring StreamTasks failed. Deleting > StreamTasks stores to recreate from > scratch.org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets > out of range with no configured reset policy for partitions: > \{flash-app-xmc-worker-share-store-minute-changelog-1=6128684} at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:987)2019-11-19 > 05:50:49.817|INFO > |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|105|stream-thread > [flash-client-xmc-StreamThread-3] Reinitializing StreamTask TaskId: 10_1 > ProcessorTopology: KSTREAM-SOURCE-70: topics: > [flash-app-xmc-worker-share-store-minute-repartition] children: > [KSTREAM-AGGREGATE-67] KSTREAM-AGGREGATE-67: states: > [worker-share-store-minute] children: [KTABLE-TOSTREAM-71] > KTABLE-TOSTREAM-71: children: [KSTREAM-SINK-72] > KSTREAM-SINK-72: topic: > StaticTopicNameExtractor(xmc-worker-share-minute)Partitions > [flash-app-xmc-worker-share-store-minute-repartition-1] for changelog > flash-app-xmc-worker-share-store-minute-changelog-12019-11-19 > 05:50:49.842|WARN > |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|101|stream-thread > [flash-client-xmc-StreamThread-3] Restoring StreamTasks failed. Deleting > StreamTasks stores to recreate from > scratch.org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets > out of range with no configured reset policy for partitions: > \{flash-app-xmc-worker-share-store-minute-changelog-1=6128684} at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:987)2019-11-19 > 05:50:49.842|INFO > |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|105|stream-thread > [flash-client-xmc-StreamThread-3] Reinitializing StreamTask TaskId: 10_1 > ProcessorTopology: KSTREAM-SOURCE-70: topics: > [flash-app-xmc-worker-share-store-minute-repartition] children: > [KSTREAM-AGGREGATE-67] KSTREAM-AGGREGATE-67: states: > [worker-share-store-minute] children: [KTABLE-TOSTREAM-71] > KTABLE-TOSTREAM-71: children: [KSTREAM-SINK-72] > KSTREAM-SINK-72: topic: > StaticTopicNameExtractor(xmc-worker-share-minute)Partitions > [flash-app-xmc-worker-share-store-minute-repartition-1] for changelog > flash-app-xmc-worker-share-store-minute-changelog-12019-11-19 > 05:50:49.905|WARN > |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|101|stream-thread > [flash-client-xmc-StreamThread-3] Restoring StreamTasks failed. Deleting > StreamTasks stores to recreate from > scratch.org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets > out of range with no configured reset policy for partitions: > \{flash-app-xmc-worker-share-store-minute-changelog-1=6128684} at > org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:987)2019-11-19 > 05:50:49.906|INFO > |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|105|stream-thread > [flash-client-xmc-StreamThread-3] Reinitializing StreamTask TaskId: 10_1 > ProcessorTopology: KSTREAM-SOURCE-70: topics: > [flash-app-xmc-worker-share-store-minute-repartition] children: > [KSTREAM-AGGREGATE-67] KSTREAM-AGGREGATE-67: states: > [worker-share-store-minute] > {quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167481#comment-17167481 ] Ismael Juma commented on KAFKA-10324: - Segments generally are larger than the fetch response size, so it's usually not an issue. > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167481#comment-17167481 ] Ismael Juma edited comment on KAFKA-10324 at 7/29/20, 8:00 PM: --- Segments generally are typically significantly larger than the fetch response size, so it's usually not an issue. was (Author: ijuma): Segments generally are larger than the fetch response size, so it's usually not an issue. > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167480#comment-17167480 ] Tommy Becker commented on KAFKA-10324: -- So is it always the case that records returned in a FetchResponse are all from a single segment? Might that not be inefficient? > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167444#comment-17167444 ] Jason Gustafson commented on KAFKA-10324: - Thanks. Unfortunately it's not super straightforward to fix, but we have some ideas. Basically we have to modify the down-conversion logic so that it can read across multiple segments until it finds a non-empty batch to return. > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167431#comment-17167431 ] Tommy Becker commented on KAFKA-10324: -- Yes. See my comment above. > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167413#comment-17167413 ] Jason Gustafson commented on KAFKA-10324: - Aha, it is because it is at the end of the segment. Was that true in the other cases? > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cmccabe commented on a change in pull request #9012: KAFKA-10270: A broker to controller channel manager
cmccabe commented on a change in pull request #9012: URL: https://github.com/apache/kafka/pull/9012#discussion_r462385360 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.{LinkedBlockingDeque, TimeUnit} + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import kafka.utils.Logging +import org.apache.kafka.clients._ +import org.apache.kafka.common.requests.AbstractRequest +import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.common.Node +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.security.JaasContext + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * This class manages the connection between a broker and the controller. It runs a single + * {@link BrokerToControllerRequestThread} which uses the broker's metadata cache as its own metadata to find + * and connect to the controller. The channel is async and runs the network connection in the background. + * The maximum number of in-flight requests are set to one to ensure orderly response from the controller, therefore + * care must be taken to not block on outstanding requests for too long. + */ +class BrokerToControllerChannelManager(metadataCache: kafka.server.MetadataCache, + time: Time, + metrics: Metrics, + config: KafkaConfig, + threadNamePrefix: Option[String] = None) extends Logging { + private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem] + private val logContext = new LogContext(s"[broker-${config.brokerId}-to-controller] ") + private val manualMetadataUpdater = new ManualMetadataUpdater() + private val requestThread = newRequestThread + + def start(): Unit = { +requestThread.start() + } + + def shutdown(): Unit = { +requestThread.shutdown() +requestThread.awaitShutdown() + } + + private[server] def newRequestThread = { +val brokerToControllerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) +val brokerToControllerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) + +val networkClient = { + val channelBuilder = ChannelBuilders.clientChannelBuilder( +brokerToControllerSecurityProtocol, +JaasContext.Type.SERVER, +config, +brokerToControllerListenerName, +config.saslMechanismInterBrokerProtocol, +time, +config.saslInterBrokerHandshakeRequestEnable, +logContext + ) + val selector = new Selector( +NetworkReceive.UNLIMITED, +Selector.NO_IDLE_TIMEOUT_MS, +metrics, +time, +"BrokerToControllerChannel", +Map("BrokerId" -> config.brokerId.toString).asJava, +false, +channelBuilder, +logContext + ) + new NetworkClient( +selector, +manualMetadataUpdater, +config.brokerId.toString, +1, +0, +0, +Selectable.USE_DEFAULT_BUFFER_SIZE, +Selectable.USE_DEFAULT_BUFFER_SIZE, +config.requestTimeoutMs, +config.connectionSetupTimeoutMs, +config.connectionSetupTimeoutMaxMs, +ClientDnsLookup.USE_ALL_DNS_IPS, +time, +false, +new ApiVersions, +logContext + ) +} +val threadName = threadNamePrefix match { + case None => s"broker-${config.brokerId}-to-controller-send-thread" + case Some(name) => s"$name:broker-${config.brokerId}-to-controller-send-thread" +} + +new BrokerToControllerRequestThread(networkClient, manualMetadataUpdater, requestQueue, metadataCache, config, + brokerToControllerListenerName, time, threadName) + } + + private[server] def sendRequest(request: AbstractRequest.Builder[_ <:
[GitHub] [kafka] chia7712 commented on pull request #9097: KAFKA-10319: Skip unknown offsets when computing sum of changelog offsets
chia7712 commented on pull request #9097: URL: https://github.com/apache/kafka/pull/9097#issuecomment-665504521 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna opened a new pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording
cadonna opened a new pull request #9098: URL: https://github.com/apache/kafka/pull/9098 This PR refactors the RocksDB store and the metrics infrastructure in Streams in preparation of the recordings of the RocksDB properties specified in KIP-607. The refactoring includes: - wrapper around `BlockedBasedTableConfig` to make the cache accessible to the RocksDB metrics recorder - RocksDB metrics recorder now takes also the DB instance and the cache in addition to the statistics - The value providers for the metrics are added to the RockDB metrics recorder also if the recording level is INFO. - The creation of the RocksDB metrics recording trigger is moved to `StreamsMetricsImpl` ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] badaiaqrandista edited a comment on pull request #4807: KAFKA-6733: Support of printing additional ConsumerRecord fields in DefaultMessageFormatter
badaiaqrandista edited a comment on pull request #4807: URL: https://github.com/apache/kafka/pull/4807#issuecomment-665660018 This PR should be closed as it has been superseded by PR 9099 (https://github.com/apache/kafka/pull/9099). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167400#comment-17167400 ] Tommy Becker edited comment on KAFKA-10324 at 7/29/20, 5:54 PM: [~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm not sure. But I can tell you I see this behavior even with max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to do with down conversion? Anyway, here's an excerpt from a dump of the segment containing the problematic offset, which is 13920987: baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false isControl: false position: 98516844 CreateTime: 1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: true \| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] \| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] \| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] \| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] \| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] \| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] End of segment is here was (Author: twbecker): [~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm not sure. But I can tell you I see this behavior even with max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to do with down conversion? Anyway, here's an excerpt from a dump of the segment containing the problematic offset, which is 13920987: baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false isControl: false position: 98516844 CreateTime: 1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: true | offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] | offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] End of segment is here > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness
cadonna commented on pull request #9087: URL: https://github.com/apache/kafka/pull/9087#issuecomment-665526207 I did some runs of the `RestoreIntegrationTest` and the runtime does not significantly change between runs with default values for session timeout and heartbeat interval and reduced values for session timeout and heartbeat interval. Hence, I set the two configs to their default values for the entire test. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on a change in pull request #9092: KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long
chia7712 commented on a change in pull request #9092: URL: https://github.com/apache/kafka/pull/9092#discussion_r462394461 ## File path: core/src/main/scala/kafka/server/DynamicConfig.scala ## @@ -103,7 +103,7 @@ object DynamicConfig { .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, MEDIUM, ProducerOverrideDoc) .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ConsumerOverrideDoc) .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, MEDIUM, RequestOverrideDoc) - .define(ControllerMutationOverrideProp, LONG, DefaultConsumerOverride, MEDIUM, ControllerMutationOverrideDoc) + .define(ControllerMutationOverrideProp, DOUBLE, DefaultConsumerOverride, MEDIUM, ControllerMutationOverrideDoc) Review comment: why not changing the type of default value from ```Long``` to ```Double```? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter
dajac commented on a change in pull request #9099: URL: https://github.com/apache/kafka/pull/9099#discussion_r462306541 ## File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala ## @@ -459,48 +466,32 @@ class DefaultMessageFormatter extends MessageFormatter { var printKey = false var printValue = true var printPartition = false - var keySeparator = "\t".getBytes(StandardCharsets.UTF_8) - var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8) + var printOffset = false + var printHeaders = false + var keySeparator = utfBytes("\t") + var lineSeparator = utfBytes("\n") + var headersSeparator = utfBytes(",") + var nullLiteral = utfBytes("null") var keyDeserializer: Option[Deserializer[_]] = None var valueDeserializer: Option[Deserializer[_]] = None - - override def configure(configs: Map[String, _]): Unit = { -val props = new java.util.Properties() -configs.asScala.foreach { case (key, value) => props.put(key, value.toString) } -if (props.containsKey("print.timestamp")) - printTimestamp = props.getProperty("print.timestamp").trim.equalsIgnoreCase("true") -if (props.containsKey("print.key")) - printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true") -if (props.containsKey("print.value")) - printValue = props.getProperty("print.value").trim.equalsIgnoreCase("true") -if (props.containsKey("print.partition")) - printPartition = props.getProperty("print.partition").trim.equalsIgnoreCase("true") -if (props.containsKey("key.separator")) - keySeparator = props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8) -if (props.containsKey("line.separator")) - lineSeparator = props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8) -// Note that `toString` will be called on the instance returned by `Deserializer.deserialize` -if (props.containsKey("key.deserializer")) { - keyDeserializer = Some(Class.forName(props.getProperty("key.deserializer")).getDeclaredConstructor() -.newInstance().asInstanceOf[Deserializer[_]]) - keyDeserializer.get.configure(propertiesWithKeyPrefixStripped("key.deserializer.", props).asScala.asJava, true) -} -// Note that `toString` will be called on the instance returned by `Deserializer.deserialize` -if (props.containsKey("value.deserializer")) { - valueDeserializer = Some(Class.forName(props.getProperty("value.deserializer")).getDeclaredConstructor() -.newInstance().asInstanceOf[Deserializer[_]]) - valueDeserializer.get.configure(propertiesWithKeyPrefixStripped("value.deserializer.", props).asScala.asJava, false) -} - } - - private def propertiesWithKeyPrefixStripped(prefix: String, props: Properties): Properties = { -val newProps = new Properties() -props.asScala.foreach { case (key, value) => - if (key.startsWith(prefix) && key.length > prefix.length) -newProps.put(key.substring(prefix.length), value) -} -newProps + var headersDeserializer: Option[Deserializer[_]] = None + + override def init(props: Properties): Unit = { Review comment: `init(props: Properties)` has been deprecated. It would be great if we could keep using `configure(configs: Map[String, _])` as before. I think that we should also try to directly extract the values from the `Map` instead of using a `Properties`. ## File path: core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala ## @@ -0,0 +1,235 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package unit.kafka.tools + +import java.io.{ByteArrayOutputStream, Closeable, PrintStream} +import java.nio.charset.StandardCharsets +import java.util +import java.util.Properties + +import kafka.tools.DefaultMessageFormatter +import org.apache.kafka.clients.consumer.ConsumerRecord +import org.apache.kafka.common.header.Header +import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders} +import org.apache.kafka.common.record.TimestampType +import org.apache.kafka.common.serialization.Deserializer +import org.junit.Assert._ +import org.junit.Test +import org.junit.runner.RunWith +import
[GitHub] [kafka] cadonna commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording
cadonna commented on pull request #9098: URL: https://github.com/apache/kafka/pull/9098#issuecomment-665616943 Call for review: @vvcephei @guozhangwang This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167400#comment-17167400 ] Tommy Becker edited comment on KAFKA-10324 at 7/29/20, 5:46 PM: [~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm not sure. But I can tell you I see this behavior even with max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to do with down conversion? Anyway, here's an excerpt from a dump of the segment containing the problematic offset, which is 13920987: baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false isControl: false position: 98516844 CreateTime: 1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: true | offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] | offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] End of segment is here was (Author: twbecker): [~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm not sure. But I can tell you I see this behavior even with max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to do with down conversion? Anyway, here's an excerpt from a dump of the segment containing the problematic offset, which is 13920987: baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false isControl: false position: 98516844 CreateTime: 1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: true | offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] | offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] ### End of segment is here > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
mumrah commented on pull request #9008: URL: https://github.com/apache/kafka/pull/9008#issuecomment-665795947 I ran a console consumer perf test (at @hachikuji's suggestion) and took a profile. ![image](https://user-images.githubusercontent.com/55116/88832229-81be6d00-d19e-11ea-9ee9-51b6054a6731.png) Zoomed in a bit on the records part: ![image](https://user-images.githubusercontent.com/55116/88832276-93a01000-d19e-11ea-9293-a138c38f6ed3.png) This was with only a handful of partitions on a single broker (on my laptop), but it confirms that the new FetchResponse serialization is hitting the same sendfile path as the previous code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167400#comment-17167400 ] Tommy Becker commented on KAFKA-10324: -- [~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm not sure. But I can tell you I see this behavior even with max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to do with down conversion? Anyway, here's an excerpt from a dump of the segment containing the problematic offset, which is 13920987: baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 isTransactional: false isControl: false position: 98516844 CreateTime: 1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: true | offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] | offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 sequence: -1 headerKeys: [] | offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 sequence: -1 headerKeys: [] | offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 sequence: -1 headerKeys: [] ### End of segment is here > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics
cadonna commented on a change in pull request #9094: URL: https://github.com/apache/kafka/pull/9094#discussion_r462327050 ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -227,6 +234,14 @@ protected Bytes keyBytes(final K key) { return byteEntries; } +private void maybeRecordE2ELatency() { +if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { Review comment: I think, you do not need to check for metrics with `e2eLatencySensor.hasMetrics()`. There should always be metrics within this sensor. `hasMetrics()` is used in `StreamsMetricsImpl#maybeMeasureLatency()` because some sensors may not contain any metrics due to the built-in metrics version. For instance, the destroy sensor exists for built-in metrics version 0.10.0-2.4 but not for latest. To avoid version checks in the record processing code, we just create an empty sensor and call record on it effectively not recording any metrics for this sensor for version latest. We do not hide newly added metrics if the built-in version is set to an older version. Same applies to the other uses of `hasMetrics()` introduced in this PR. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java ## @@ -248,4 +253,12 @@ public void close() { private Bytes keyBytes(final K key) { return Bytes.wrap(serdes.rawKey(key)); } + +private void maybeRecordE2ELatency() { +if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { Review comment: Your approach makes sense to me. I agree that the latency should refer to the update in the state store and not to record itself. If a record updates the state more than once then latency should be measured each time. ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java ## @@ -443,6 +447,25 @@ public static Sensor suppressionBufferSizeSensor(final String threadId, ); } +public static Sensor e2ELatencySensor(final String threadId, + final String taskId, + final String storeType, + final String storeName, + final StreamsMetricsImpl streamsMetrics) { +final Sensor sensor = streamsMetrics.storeLevelSensor(threadId, taskId, storeName, RECORD_E2E_LATENCY, RecordingLevel.TRACE); +final Map tagMap = streamsMetrics.storeLevelTagMap(threadId, taskId, storeType, storeName); +addAvgAndMinAndMaxToSensor( +sensor, +STATE_STORE_LEVEL_GROUP, Review comment: You need to use the `stateStoreLevelGroup()` here instead of `STATE_STORE_LEVEL_GROUP` because the group name depends on the version and the store type. ## File path: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java ## @@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String group0100To24, checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, expectedNumberofE2ELatencyMetrics); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, expectedNumberofE2ELatencyMetrics); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, expectedNumberofE2ELatencyMetrics); Review comment: I agree with you, it should always be 1. It is the group of the metrics. See my comment in `StateStoreMetrics`. I am glad this test served its purpose, because I did not notice this in the unit tests! ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java ## @@ -468,38 +468,44 @@ public void shouldRecordE2ELatencyOnProcessForSourceNodes() { } @Test -public void shouldRecordE2ELatencyMinAndMax() { +public void shouldRecordE2ELatencyAvgAndMinAndMax() { time = new MockTime(0L, 0L, 0L); metrics = new Metrics(new MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time); task = createStatelessTask(createConfig(false, "0"), StreamsConfig.METRICS_LATEST); final String sourceNode = source1.name(); -final Metric maxMetric = getProcessorMetric("record-e2e-latency", "%s-max", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST); +final Metric avgMetric = getProcessorMetric("record-e2e-latency", "%s-avg", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST); final Metric minMetric = getProcessorMetric("record-e2e-latency", "%s-min",
[GitHub] [kafka] badaiaqrandista opened a new pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter
badaiaqrandista opened a new pull request #9099: URL: https://github.com/apache/kafka/pull/9099 Implementation of KIP-431 - Support of printing additional ConsumerRecord fields in DefaultMessageFormatter https://cwiki.apache.org/confluence/display/KAFKA/KIP-431%3A+Support+of+printing+additional+ConsumerRecord+fields+in+DefaultMessageFormatter *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness
guozhangwang commented on pull request #9087: URL: https://github.com/apache/kafka/pull/9087#issuecomment-665779451 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9012: KAFKA-10270: A broker to controller channel manager
abbccdda commented on pull request #9012: URL: https://github.com/apache/kafka/pull/9012#issuecomment-665757647 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mumrah commented on a change in pull request #9012: KAFKA-10270: A broker to controller channel manager
mumrah commented on a change in pull request #9012: URL: https://github.com/apache/kafka/pull/9012#discussion_r462352682 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -0,0 +1,188 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.{LinkedBlockingDeque, TimeUnit} + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import kafka.utils.Logging +import org.apache.kafka.clients._ +import org.apache.kafka.common.requests.AbstractRequest +import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.common.Node +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.security.JaasContext + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * This class manages the connection between a broker and the controller. It runs a single + * {@link BrokerToControllerRequestThread} which uses the broker's metadata cache as its own metadata to find + * and connect to the controller. The channel is async and runs the network connection in the background. + * The maximum number of in-flight requests are set to one to ensure orderly response from the controller, therefore + * care must be taken to not block on outstanding requests for too long. + */ +class BrokerToControllerChannelManager(metadataCache: kafka.server.MetadataCache, + time: Time, + metrics: Metrics, + config: KafkaConfig, + threadNamePrefix: Option[String] = None) extends Logging { + private val requestQueue = new LinkedBlockingDeque[BrokerToControllerQueueItem] + private val logContext = new LogContext(s"[broker-${config.brokerId}-to-controller] ") + private val manualMetadataUpdater = new ManualMetadataUpdater() + private val requestThread = newRequestThread + + def start(): Unit = { +requestThread.start() + } + + def shutdown(): Unit = { +requestThread.shutdown() +requestThread.awaitShutdown() + } + + private[server] def newRequestThread = { +val brokerToControllerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) +val brokerToControllerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) + +val networkClient = { + val channelBuilder = ChannelBuilders.clientChannelBuilder( +brokerToControllerSecurityProtocol, +JaasContext.Type.SERVER, +config, +brokerToControllerListenerName, +config.saslMechanismInterBrokerProtocol, +time, +config.saslInterBrokerHandshakeRequestEnable, +logContext + ) + val selector = new Selector( +NetworkReceive.UNLIMITED, +Selector.NO_IDLE_TIMEOUT_MS, +metrics, +time, +"BrokerToControllerChannel", +Map("BrokerId" -> config.brokerId.toString).asJava, +false, +channelBuilder, +logContext + ) + new NetworkClient( +selector, +manualMetadataUpdater, +config.brokerId.toString, +1, +0, +0, +Selectable.USE_DEFAULT_BUFFER_SIZE, +Selectable.USE_DEFAULT_BUFFER_SIZE, +config.requestTimeoutMs, +config.connectionSetupTimeoutMs, +config.connectionSetupTimeoutMaxMs, +ClientDnsLookup.USE_ALL_DNS_IPS, +time, +false, +new ApiVersions, +logContext + ) +} +val threadName = threadNamePrefix match { + case None => s"broker-${config.brokerId}-to-controller-send-thread" + case Some(name) => s"$name:broker-${config.brokerId}-to-controller-send-thread" +} + +new BrokerToControllerRequestThread(networkClient, manualMetadataUpdater, requestQueue, metadataCache, config, + brokerToControllerListenerName, time, threadName) + } + + private[server] def sendRequest(request: AbstractRequest.Builder[_ <:
[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
mumrah commented on a change in pull request #9008: URL: https://github.com/apache/kafka/pull/9008#discussion_r462364719 ## File path: clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java ## @@ -366,225 +255,128 @@ public FetchResponse(Errors error, LinkedHashMap> responseData, int throttleTimeMs, int sessionId) { -this.error = error; -this.responseData = responseData; -this.throttleTimeMs = throttleTimeMs; -this.sessionId = sessionId; +this.data = toMessage(throttleTimeMs, error, responseData.entrySet().iterator(), sessionId); +this.responseDataMap = responseData; } -public static FetchResponse parse(Struct struct) { -LinkedHashMap> responseData = new LinkedHashMap<>(); -for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) { -Struct topicResponse = (Struct) topicResponseObj; -String topic = topicResponse.get(TOPIC_NAME); -for (Object partitionResponseObj : topicResponse.getArray(PARTITIONS_KEY_NAME)) { -Struct partitionResponse = (Struct) partitionResponseObj; -Struct partitionResponseHeader = partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME); -int partition = partitionResponseHeader.get(PARTITION_ID); -Errors error = Errors.forCode(partitionResponseHeader.get(ERROR_CODE)); -long highWatermark = partitionResponseHeader.get(HIGH_WATERMARK); -long lastStableOffset = partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, INVALID_LAST_STABLE_OFFSET); -long logStartOffset = partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET); -Optional preferredReadReplica = Optional.of( -partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, INVALID_PREFERRED_REPLICA_ID) - ).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate()); - -BaseRecords baseRecords = partitionResponse.getRecords(RECORD_SET_KEY_NAME); -if (!(baseRecords instanceof MemoryRecords)) -throw new IllegalStateException("Unknown records type found: " + baseRecords.getClass()); -MemoryRecords records = (MemoryRecords) baseRecords; - -List abortedTransactions = null; -if (partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) { -Object[] abortedTransactionsArray = partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME); -if (abortedTransactionsArray != null) { -abortedTransactions = new ArrayList<>(abortedTransactionsArray.length); -for (Object abortedTransactionObj : abortedTransactionsArray) { -Struct abortedTransactionStruct = (Struct) abortedTransactionObj; -long producerId = abortedTransactionStruct.get(PRODUCER_ID); -long firstOffset = abortedTransactionStruct.get(FIRST_OFFSET); -abortedTransactions.add(new AbortedTransaction(producerId, firstOffset)); -} -} -} - -PartitionData partitionData = new PartitionData<>(error, highWatermark, lastStableOffset, -logStartOffset, preferredReadReplica, abortedTransactions, records); -responseData.put(new TopicPartition(topic, partition), partitionData); -} -} -return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, (short) 0)), responseData, -struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), struct.getOrElse(SESSION_ID, INVALID_SESSION_ID)); +public FetchResponse(FetchResponseData fetchResponseData) { +this.data = fetchResponseData; +this.responseDataMap = toResponseDataMap(fetchResponseData); } @Override public Struct toStruct(short version) { -return toStruct(version, throttleTimeMs, error, responseData.entrySet().iterator(), sessionId); +return data.toStruct(version); } @Override -protected Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) { -Struct responseHeaderStruct = responseHeader.toStruct(); -Struct responseBodyStruct = toStruct(apiVersion); - -// write the total size and the response header -ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() + 4); -buffer.putInt(responseHeaderStruct.sizeOf() + responseBodyStruct.sizeOf()); -responseHeaderStruct.writeTo(buffer); +public Send toSend(String dest, ResponseHeader responseHeader, short apiVersion) { +// Generate the Sends for
[GitHub] [kafka] chia7712 commented on pull request #9096: MINOR: Add comments to constrainedAssign and generalAssign method
chia7712 commented on pull request #9096: URL: https://github.com/apache/kafka/pull/9096#issuecomment-665504255 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] badaiaqrandista commented on pull request #4807: KAFKA-6733: Support of printing additional ConsumerRecord fields in DefaultMessageFormatter
badaiaqrandista commented on pull request #4807: URL: https://github.com/apache/kafka/pull/4807#issuecomment-665660018 Closing this PR as it has been superseded by PR 9099 (https://github.com/apache/kafka/pull/9099). This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] soarez commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers
soarez commented on pull request #9000: URL: https://github.com/apache/kafka/pull/9000#issuecomment-665668720 @mjsax what can we do to proceed? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] gwenshap commented on pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log
gwenshap commented on pull request #9054: URL: https://github.com/apache/kafka/pull/9054#issuecomment-665773125 retest this please This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167387#comment-17167387 ] Jason Gustafson commented on KAFKA-10324: - [~twbecker] Thanks for the report. I'm trying to understand why the fetch is not including batches beyond the one with the last offset removed. Is that because the batch itself is already satisfying the fetch max bytes? It would be helpful to include a snippet from a dump of the log with the batch that is causing problems. > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167371#comment-17167371 ] Tommy Becker commented on KAFKA-10324: -- Thanks for the response [~ijuma]. Yes, these are Java consumers, and I agree it's odd that this has not been found before now. I have limited familiarity with the code base, so it's possible I'm missing something but I believe the issue is as I described. In my tests I'm trying to consume a 25GB topic and have found 2 distinct offsets which the consumer cannot advance beyond, and they are both cases where the offset: # Is the lastOffset in the last batch of its log segment # Does not actually exist, presumably due to log compaction. > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167369#comment-17167369 ] Ismael Juma commented on KAFKA-10324: - Thanks for the report. Looks like this issue was introduced in 0.11.0.0, would you agree? Interesting that no-one reported it for so long. Out of curiosity, the old consumers are Java consumers or other languages? > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
[ https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167357#comment-17167357 ] Tommy Becker commented on KAFKA-10324: -- We have some legacy applications whose consumer versions are not easily upgraded hitting this issue, and it's hard to diagnose since the consumers do not give a proper message (or indeed any message in the case of the 0.10.1.0 consumer) and since it is dependent on the way messages are batched, which is opaque to clients. > Pre-0.11 consumers can get stuck when messages are downconverted from V2 > format > --- > > Key: KAFKA-10324 > URL: https://issues.apache.org/jira/browse/KAFKA-10324 > Project: Kafka > Issue Type: Bug >Reporter: Tommy Becker >Priority: Major > > As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset > even if that offset gets removed due to log compaction. If a pre-0.11 > consumer seeks to such an offset and issues a fetch, it will get an empty > batch, since offsets prior to the requested one are filtered out during > down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch > offset in this case, but this leaves old consumers unable to consume these > topics. > The exact behavior varies depending on consumer version. The 0.10.0.0 > consumer throws RecordTooLargeException and dies, believing that the record > must not have been returned because it was too large. The 0.10.1.0 consumer > simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format
Tommy Becker created KAFKA-10324: Summary: Pre-0.11 consumers can get stuck when messages are downconverted from V2 format Key: KAFKA-10324 URL: https://issues.apache.org/jira/browse/KAFKA-10324 Project: Kafka Issue Type: Bug Reporter: Tommy Becker As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset even if that offset gets removed due to log compaction. If a pre-0.11 consumer seeks to such an offset and issues a fetch, it will get an empty batch, since offsets prior to the requested one are filtered out during down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch offset in this case, but this leaves old consumers unable to consume these topics. The exact behavior varies depending on consumer version. The 0.10.0.0 consumer throws RecordTooLargeException and dies, believing that the record must not have been returned because it was too large. The 0.10.1.0 consumer simply spins fetching the same empty batch over and over. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9273) Refactor AbstractJoinIntegrationTest and Sub-classes
[ https://issues.apache.org/jira/browse/KAFKA-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167349#comment-17167349 ] Bill Bejeck commented on KAFKA-9273: Hi [~albert02lowis], Thanks for your interest. It looks like [~sujayopensource] has not started work on this ticket yet. I'd give another day or so to respond, then if you don't hear anything back, feel free to pick this ticket up. I've taken the liberty of adding you to the contributors list, so you should be able to self-assign this ticket and any others in the future. -Bill > Refactor AbstractJoinIntegrationTest and Sub-classes > > > Key: KAFKA-9273 > URL: https://issues.apache.org/jira/browse/KAFKA-9273 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bill Bejeck >Assignee: Sujay Hegde >Priority: Major > Labels: newbie > > The AbstractJoinIntegrationTest uses an embedded broker, but not all the > sub-classes require the use of an embedded broker anymore. Additionally, > there are two test remaining that require an embedded broker, but they don't > perform joins, the are tests validating other conditions, so ideally those > tests should move into a separate test -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9273) Refactor AbstractJoinIntegrationTest and Sub-classes
[ https://issues.apache.org/jira/browse/KAFKA-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167321#comment-17167321 ] Albert Lowis commented on KAFKA-9273: - Hi [~bbejeck] , [~sujayopensource] I am a newbie looking to contribute, Can I take up this task? Since I see that it has been sometime since the last activity Thank you, Albert > Refactor AbstractJoinIntegrationTest and Sub-classes > > > Key: KAFKA-9273 > URL: https://issues.apache.org/jira/browse/KAFKA-9273 > Project: Kafka > Issue Type: Test > Components: streams >Reporter: Bill Bejeck >Assignee: Sujay Hegde >Priority: Major > Labels: newbie > > The AbstractJoinIntegrationTest uses an embedded broker, but not all the > sub-classes require the use of an embedded broker anymore. Additionally, > there are two test remaining that require an embedded broker, but they don't > perform joins, the are tests validating other conditions, so ideally those > tests should move into a separate test -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10323) NullPointerException during rebalance
yazgoo created KAFKA-10323: -- Summary: NullPointerException during rebalance Key: KAFKA-10323 URL: https://issues.apache.org/jira/browse/KAFKA-10323 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 2.5.0 Reporter: yazgoo *confluent platform version: 5.5.0-ccs* connector used: s3 Connector stops after rebalancing: ERROR [Worker clientId=connect-1, groupId=connect] Couldn't instantiate task because it has an invalid task configuration. This task will not execute until reconfigured. java.lang.NullPointerException at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:427) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1147) at org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:126) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1162) at org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1158) at java.util.concurrent.FutureTask.run(FutureTask.java:266) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:748) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-6733) Support of printing additional ConsumerRecord fields in DefaultMessageFormatter
[ https://issues.apache.org/jira/browse/KAFKA-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167207#comment-17167207 ] Badai Aqrandista commented on KAFKA-6733: - I did a bad merge on PR 8909. So I've closed it. I created a new PR 9099 that contain the against the latest trunk. This is ready for review: https://github.com/apache/kafka/pull/9099 > Support of printing additional ConsumerRecord fields in > DefaultMessageFormatter > --- > > Key: KAFKA-6733 > URL: https://issues.apache.org/jira/browse/KAFKA-6733 > Project: Kafka > Issue Type: Improvement > Components: tools >Reporter: Mateusz Zakarczemny >Assignee: Badai Aqrandista >Priority: Minor > > It would be useful to have possibility of printing headers, partition and > offset in ConsoleConsumer. Especially support of headers seems to be missing. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10322) InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic)
Tomasz Bradło created KAFKA-10322: - Summary: InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic) Key: KAFKA-10322 URL: https://issues.apache.org/jira/browse/KAFKA-10322 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 2.5.0 Environment: windows/linux Reporter: Tomasz Bradło I have regular groupBy stream configuration: {code:java} fun addStream(kStreamBuilder: StreamsBuilder) { val storeSupplier = Stores.inMemoryWindowStore("count-store", Duration.ofDays(10), Duration.ofDays(1), false) val storeBuilder: StoreBuilder> = Stores .windowStoreBuilder(storeSupplier, JsonSerde(CountableEvent::class.java), Serdes.Long()) kStreamBuilder .stream("input-topic", Consumed.with(Serdes.String(), Serdes.String())) .map {_, jsonRepresentation -> KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)} .groupByKey() .windowedBy(TimeWindows.of(Duration.ofDays(1))) .count(Materialized.with(JsonSerde(CountableEvent::class.java), Serdes.Long())) .toStream() .to("topic1-count") val storeConsumed = Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java), Duration.ofDays(1).toMillis()), Serdes.Long()) kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", storeConsumed, passThroughProcessorSupplier) }{code} While sending to "topic1-count", for serializing the key [TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java] is used which is using [WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112] so the message key format is: {code:java} real_grouping_key + timestamp(8bytes){code} Everything works. I can get correct values from state-store. But, in recovery scenario, when [GlobalStateManagerImpl |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters offset < highWatermark loop then [InMemoryWindowStore stateRestoreCallback |https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads from "topic1-count" and fails to extract valid key and timestamp using [WindowKeySchema.extractStoreKeyBytes |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and [WindowKeySchema.extractStoreTimestamp. |https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It fails because it expects format: {code:java} real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code} How this is supposed to work in this case? -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] junrao commented on pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion
junrao commented on pull request #8936: URL: https://github.com/apache/kafka/pull/8936#issuecomment-665148070 I think we agreed to change the logic to do the index sanity check on index opening. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)
guozhangwang commented on pull request #8993: URL: https://github.com/apache/kafka/pull/8993#issuecomment-665190912 Sorry for the long delay @vvcephei , the PR lgtm! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #9094: KAFKA-10054: add TRACE-level e2e latency metrics
ableegoldman commented on a change in pull request #9094: URL: https://github.com/apache/kafka/pull/9094#discussion_r461993688 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java ## @@ -198,7 +198,7 @@ .define(METRICS_RECORDING_LEVEL_CONFIG, Type.STRING, Sensor.RecordingLevel.INFO.toString(), - in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString()), + in(Sensor.RecordingLevel.INFO.toString(), Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString()), Review comment: It's kind of a bummer that we can't just add the new TRACE level for Streams only; we have to add it to all the clients that Streams passes its configs down to. We could check for the new TRACE level and strip it off before passing the configs on to the clients, but that just seems like asking for trouble. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java ## @@ -88,13 +92,6 @@ private TaskMetrics() {} private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count of buffered records that are polled " + "from consumer and not yet processed for this active task"; -private static final String RECORD_E2E_LATENCY = "record-e2e-latency"; Review comment: Moved the common descriptions to StreamsMetricsImpl ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java ## @@ -227,6 +234,14 @@ protected Bytes keyBytes(final K key) { return byteEntries; } +private void maybeRecordE2ELatency() { +if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { Review comment: For KV stores, we just compare the current time with the current record's timestamp ## File path: streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java ## @@ -248,4 +253,12 @@ public void close() { private Bytes keyBytes(final K key) { return Bytes.wrap(serdes.rawKey(key)); } + +private void maybeRecordE2ELatency() { +if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) { Review comment: For session and window stores, we also just compare the current time with the current record's timestamp when `put` is called. This can mean the e2e latency is measured several times on the same record, for example in a windowed aggregation. At first I thought that didn't make sense, but now I think it's actually exactly what we want. First of all, it means we can actually account for the latency between calls to `put` within a processor. For simple point inserts this might not be a huge increase on the scale of ms, but more complex processing may benefit from seeing this granularity of information. If they don't want it, well, that's why we introduced `TRACE` Second, while it might seem like we're over-weighting some records by measuring the e2e latency on them more than others, I'm starting to think this actually makes more sense than not: the big picture benefit/use case for the e2e latency metric is less "how long for this record to get sent downstream" and more "how long for this record to be reflected in the state store/IQ results". Given that, each record should be weighted by its actual proportion of the state store. You aren't querying individual records (in a window store), you're querying the windows themselves I toyed around with the idea of measuring the e2e latency relative to the window time, instead of the record timestamp, but ultimately couldn't find any sense in that. Thoughts? ## File path: streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java ## @@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String group0100To24, checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0); checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, expectedNumberofE2ELatencyMetrics); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, expectedNumberofE2ELatencyMetrics); +checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, expectedNumberofE2ELatencyMetrics); Review comment: Ok there's something I'm not understanding about this test and/or the built-in metrics version. For some reason, the KV-store metrics are 0 when `METRICS_0100_TO_24` is used, and 1 (as expected) when the latest version in used. I feel like this is wrong, and it should
[GitHub] [kafka] pan3793 commented on pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null
pan3793 commented on pull request #8575: URL: https://github.com/apache/kafka/pull/8575#issuecomment-665405076 Fixed the mail thread, do you have any comments on the discussion and KIP doc? @rhauch This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda opened a new pull request #9095: KAFKA-10321: fix infinite blocking for global stream thread startup
abbccdda opened a new pull request #9095: URL: https://github.com/apache/kafka/pull/9095 In the unit test `shouldDieOnInvalidOffsetExceptionDuringStartup` for JDK 11, we spotted a case where a global stream thread startup would stall if it fails immediately upon the first poll. The reason is that `start()` function only checks whether the thread is *not running*, as it needs to block until it finishes the initialization. However, if the thread transits to `DEAD` immediately, the `start()` call would block forever. Use the failed unit test to verify it works. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r461302625 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: Thanks for digging into it @vvcephei -- The question about `pollTimeout` is fair. I guess for default configs it might not matter much, as the default is 100ms IIRC. At the same time, we apply the same `pollTimeout` during regular processing, and as a matter of fact, for this case, a use might want to use a longer poll-timeout, as otherwise, the thread would just "busy wait" anyway (only the responsiveness for a shutdown of the app should be considered). Thus, it might actually make sense to exclude `pollTimeout` completely and only use `requestTimeout + taskTimeout`. Again, using `taskTimeout` in `poll()` reduces the responsiveness of a shutdown -- however, atm during bootstrapping we ignore a shutdown signal anyway, hence, for now we don't make the situation worse. I create a ticket to fix this: https://issues.apache.org/jira/browse/KAFKA-10317 and will add a comment for now. Short related note: actually using `requestTimeout` seems to be a conservative upper bound for poll(). A request could fail with a different error before `requestTimeout` hits and would be retried internally for this case -- if this happens, we might want to start the `taskTimeout` earlier. However, we don't have any means atm to detect this case. Thus, using `requestTimeout` is the best option we have right now (because triggering `taskTimeout` too early seems to be worse than triggering it too late). I created a ticket though that might allow us to improve the code later: https://issues.apache.org/jira/browse/KAFKA-10315 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] guozhangwang commented on pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively
guozhangwang commented on pull request #9081: URL: https://github.com/apache/kafka/pull/9081#issuecomment-664690277 Any blocking APIs should be covered by the `max.block.ms` so I think this is rather a bug-fix. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on pull request #6592: KAFKA-8326: Introduce List Serde
mjsax commented on pull request #6592: URL: https://github.com/apache/kafka/pull/6592#issuecomment-665304810 @yeralin -- your PR is in my review queue. Not sure how quickly I will find time to have a look though atm -- maybe next week, but I can't promise. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rajinisivaram commented on pull request #9092: KAFKA-10163; Define `controller_mutation_rate` a Double instead of a Long
rajinisivaram commented on pull request #9092: URL: https://github.com/apache/kafka/pull/9092#issuecomment-664990134 ok to test This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sasakitoa commented on a change in pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively
sasakitoa commented on a change in pull request #9081: URL: https://github.com/apache/kafka/pull/9081#discussion_r461264833 ## File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala ## @@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset") } + @Test(expected = classOf[TimeoutException]) + def testSendOffsetsToTransactionTimeout(): Unit = { +val producer = createTransactionalProducer("transactionProducer", maxBlockMs = 1000) +producer.initTransactions() +producer.beginTransaction() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foo".getBytes, "bar".getBytes)) + +for (i <- 0 until servers.size) + killBroker(i) + +val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata]().asJava +offsets.put(new TopicPartition(topic1, 0), new OffsetAndMetadata(0)) +try { + producer.sendOffsetsToTransaction(offsets, "test-group") Review comment: Replaced from `mutable.HashMap` to Map ## File path: clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java ## @@ -687,7 +687,7 @@ public void sendOffsetsToTransaction(Map offs throwIfProducerClosed(); TransactionalRequestResult result = transactionManager.sendOffsetsToTransaction(offsets, groupMetadata); sender.wakeup(); -result.await(); +result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS); Review comment: I wrote some description related to TimeoutException and InterruptedException ## File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala ## @@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset") } + @Test(expected = classOf[TimeoutException]) + def testSendOffsetsToTransactionTimeout(): Unit = { +val producer = createTransactionalProducer("transactionProducer", maxBlockMs = 1000) +producer.initTransactions() +producer.beginTransaction() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foo".getBytes, "bar".getBytes)) + +for (i <- 0 until servers.size) + killBroker(i) + +val offsets = new mutable.HashMap[TopicPartition, OffsetAndMetadata]().asJava +offsets.put(new TopicPartition(topic1, 0), new OffsetAndMetadata(0)) +try { + producer.sendOffsetsToTransaction(offsets, "test-group") Review comment: Replaced from `mutable.HashMap` to `Map` ## File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala ## @@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset") } + @Test(expected = classOf[TimeoutException]) + def testSendOffsetsToTransactionTimeout(): Unit = { +val producer = createTransactionalProducer("transactionProducer", maxBlockMs = 1000) +producer.initTransactions() +producer.beginTransaction() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foo".getBytes, "bar".getBytes)) + +for (i <- 0 until servers.size) Review comment: Modified to use from `size` to `indices`, thanks. ## File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala ## @@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset") } + @Test(expected = classOf[TimeoutException]) + def testSendOffsetsToTransactionTimeout(): Unit = { +val producer = createTransactionalProducer("transactionProducer", maxBlockMs = 1000) +producer.initTransactions() +producer.beginTransaction() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foo".getBytes, "bar".getBytes)) + +for (i <- 0 until servers.size) + killBroker(i) + +try { + producer.sendOffsetsToTransaction(Map( Review comment: I added some timeout tests for initTransaction, commitTransction, abortTransaction using same base method. Is this implementation correct what you intended? ## File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala ## @@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset") } + @Test(expected = classOf[TimeoutException]) + def testSendOffsetsToTransactionTimeout(): Unit = { +val producer =
[GitHub] [kafka] yeralin commented on pull request #6592: KAFKA-8326: Introduce List Serde
yeralin commented on pull request #6592: URL: https://github.com/apache/kafka/pull/6592#issuecomment-665078937 Any updates? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
vvcephei commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-665126186 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
abbccdda commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r462047068 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -274,30 +252,74 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +offset = retryUntilSuccessOrThrowOnTaskTimeout( +() -> globalConsumer.position(topicPartition), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; -while (offset < highWatermark) { -final ConsumerRecords records = globalConsumer.poll(pollTime); +while (offset < highWatermark) { // when we "fix" this loop (KAFKA-7380 / KAFKA-10317) + // we should update the `poll()` timeout below + +// we ignore `poll.ms` config during bootstrapping phase and +// apply `request.timeout.ms` plus `task.timeout.ms` instead +// +// the reason is, that `poll.ms` might be too short to give a fetch request a fair chance +// to actually complete and we don't want to start `task.timeout.ms` too early +// +// we also pass `task.timeout.ms` into `poll()` directly right now as it simplifies our own code: +// if we don't pass it in, we would just track the timeout ourselves and call `poll()` again +// in our own retry loop; by passing the timeout we can reuse the consumer's internal retry loop instead +// +// note that using `request.timeout.ms` provides a conservative upper bound for the timeout; +// this implies that we might start `task.timeout.ms` "delayed" -- however, starting the timeout +// delayed is preferable (as it's more robust) than starting it too early +// +// TODO https://issues.apache.org/jira/browse/KAFKA-10315 +// -> do a more precise timeout handling if `poll` would throw an exception if a fetch request fails +// (instead of letting the consumer retry fetch requests silently) +// +// TODO https://issues.apache.org/jira/browse/KAFKA-10317 and +// https://issues.apache.org/jira/browse/KAFKA-7380 +// -> don't pass in `task.timeout.ms` to stay responsive if `KafkaStreams#close` gets called +final ConsumerRecords records = globalConsumer.poll(requestTimeoutPlusTaskTimeout); +if (records.isEmpty()) { +// this will always throw +maybeUpdateDeadlineOrThrow(time.milliseconds()); Review comment: Could we just throw here? ## File path: docs/streams/developer-guide/config-streams.html ## @@ -326,13 +321,18 @@ bootstrap.serversstate.cleanup.delay.ms Low The amount of time in milliseconds to wait before deleting state when a partition has migrated. -60 milliseconds +60 milliseconds (10 minutes) state.dir High Directory location for state stores. /tmp/kafka-streams + task.timeout.ms +Medium Review comment: @mjsax could we do a screenshot to make sure it looks good on the web-page? ## File path: streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java ## @@ -671,19 +1211,21 @@ private void writeCorruptCheckpoint() throws IOException { } } -private void
[GitHub] [kafka] guozhangwang commented on pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed [WIP]
guozhangwang commented on pull request #9083: URL: https://github.com/apache/kafka/pull/9083#issuecomment-664699699 cc @ableegoldman @mjsax This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] bob-barrett commented on a change in pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log
bob-barrett commented on a change in pull request #9054: URL: https://github.com/apache/kafka/pull/9054#discussion_r461721737 ## File path: core/src/main/scala/kafka/log/LogManager.scala ## @@ -932,6 +927,7 @@ class LogManager(logDirs: Seq[File], val logsToCheckpoint = logsInDir(logDir) checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, logsToCheckpoint, ArrayBuffer.empty) checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint) +sourceLog.removeLogMetrics() Review comment: Good catch. Opened https://issues.apache.org/jira/browse/KAFKA-10320 to track it. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)
abbccdda commented on pull request #9001: URL: https://github.com/apache/kafka/pull/9001#issuecomment-665189199 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API
hachikuji commented on a change in pull request #9008: URL: https://github.com/apache/kafka/pull/9008#discussion_r461173162 ## File path: clients/src/main/resources/common/message/FetchRequest.json ## @@ -55,35 +55,35 @@ "about": "The minimum bytes to accumulate in the response." }, { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": "0x7fff", "ignorable": true, "about": "The maximum bytes to fetch. See KIP-74 for cases where this limit may not be honored." }, -{ "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": false, +{ "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": "0", "ignorable": true, Review comment: I guess the implicit expectation is that if the protocol does not support the `read_committed` isolation level, then it wouldn't have transactional data anyway, so reverting to `read_uncommitted` is safe. Can't find a fault with that. ## File path: clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java ## @@ -146,7 +147,7 @@ public static AbstractRequest parseRequest(ApiKeys apiKey, short apiVersion, Str case PRODUCE: return new ProduceRequest(struct, apiVersion); case FETCH: -return new FetchRequest(struct, apiVersion); +return new FetchRequest(new FetchRequestData(struct, apiVersion), apiVersion); Review comment: nit: any reason not to stick with the same constructor convention as the other requests? ## File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.common.protocol; + +import org.apache.kafka.common.network.ByteBufferSend; +import org.apache.kafka.common.network.Send; +import org.apache.kafka.common.record.BaseRecords; +import org.apache.kafka.common.utils.ByteUtils; + +import java.io.DataOutput; +import java.nio.ByteBuffer; +import java.util.function.Consumer; + +/** + * Implementation of Writable which produces a sequence of {@link Send} objects. This allows for deferring the transfer + * of data from a record-set's file channel to the eventual socket channel. + * + * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on this class will append to a byte array + * according to the format specified in {@link DataOutput}. When a call is made to writeRecords, any previously written + * bytes will be flushed as a new {@link ByteBufferSend} to the given Send consumer. After flushing the pending bytes, + * another Send is passed to the consumer which wraps the underlying record-set's transfer logic. + * + * For example, + * + * + * recordsWritable.writeInt(10); + * recordsWritable.writeRecords(records1); + * recordsWritable.writeInt(20); + * recordsWritable.writeRecords(records2); + * recordsWritable.writeInt(30); + * recordsWritable.flush(); + * + * + * Will pass 5 Send objects to the consumer given in the constructor. Care must be taken by callers to flush any + * pending bytes at the end of the writing sequence to ensure everything is flushed to the consumer. This class is + * intended to be used with {@link org.apache.kafka.common.record.MultiRecordsSend}. + * + * @see org.apache.kafka.common.requests.FetchResponse + */ +public class RecordsWriter implements Writable { +private final String dest; +private final Consumer sendConsumer; +private final ByteBuffer buffer; +private int mark; + +public RecordsWriter(String dest, int totalSize, Consumer sendConsumer) { Review comment: Could we rename `totalSize` so that it is clear that it does not cover the record sizes. Maybe `totalOverheadSize` or `totalNonRecordSize` or something like that. ## File path: clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java ## @@ -0,0 +1,139 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information
[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 commented on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-665149860 ```streams_standby_replica_test``` -> https://issues.apache.org/jira/browse/KAFKA-10287 I will take a look at ```streams_broker_bounce_test``` This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #9092: KAFKA-10163; Define `controller_mutation_rate` a Double instead of a Long
dajac opened a new pull request #9092: URL: https://github.com/apache/kafka/pull/9092 First tests have shown that `controller_mutation_rate` can be quite low (e.g. around 1) in clusters with multiple tenants. At the moment, the rate is defined as a Long which limits the possible low values. Using a Double seems more appropriate. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets
chia7712 commented on pull request #9066: URL: https://github.com/apache/kafka/pull/9066#issuecomment-664764918 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9096: Add comments to constrainedAssign and generalAssign method
showuon commented on pull request #9096: URL: https://github.com/apache/kafka/pull/9096#issuecomment-665472409 @ableegoldman @vahidhashemian , please help review this PR to add comments to the assign methods. Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cyrusv closed pull request #9093: Throw error on when keys not found in FileConfigProvider
cyrusv closed pull request #9093: URL: https://github.com/apache/kafka/pull/9093 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with shims
vvcephei commented on a change in pull request #9004: URL: https://github.com/apache/kafka/pull/9004#discussion_r461271276 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -880,40 +880,41 @@ public synchronized ProcessorTopology buildGlobalStateTopology() { return globalGroups; } +@SuppressWarnings("unchecked") private ProcessorTopology build(final Set nodeGroup) { Objects.requireNonNull(applicationId, "topology has not completed optimization"); -final Map> processorMap = new LinkedHashMap<>(); -final Map> topicSourceMap = new HashMap<>(); -final Map> topicSinkMap = new HashMap<>(); +final Map> processorMap = new LinkedHashMap<>(); +final Map> topicSourceMap = new HashMap<>(); +final Map> topicSinkMap = new HashMap<>(); final Map stateStoreMap = new LinkedHashMap<>(); final Set repartitionTopics = new HashSet<>(); // create processor nodes in a topological order ("nodeFactories" is already topologically sorted) // also make sure the state store map values following the insertion ordering -for (final NodeFactory factory : nodeFactories.values()) { +for (final NodeFactory factory : nodeFactories.values()) { if (nodeGroup == null || nodeGroup.contains(factory.name)) { -final ProcessorNode node = factory.build(); +final ProcessorNode node = factory.build(); processorMap.put(node.name(), node); if (factory instanceof ProcessorNodeFactory) { buildProcessorNode(processorMap, stateStoreMap, - (ProcessorNodeFactory) factory, - node); + (ProcessorNodeFactory) factory, + (ProcessorNode) node); } else if (factory instanceof SourceNodeFactory) { buildSourceNode(topicSourceMap, repartitionTopics, -(SourceNodeFactory) factory, -(SourceNode) node); +(SourceNodeFactory) factory, +(SourceNode) node); } else if (factory instanceof SinkNodeFactory) { buildSinkNode(processorMap, topicSinkMap, repartitionTopics, - (SinkNodeFactory) factory, - (SinkNode) node); + (SinkNodeFactory) factory, + (SinkNode) node); Review comment: They have subtly different meanings, which I'm not 100% clear on all the time. I'm not sure if I had to change this one, of if it was an accident. I'll give it a closer look. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorShim.java ## @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.internals; + + +import org.apache.kafka.streams.processor.api.Processor; +import org.apache.kafka.streams.processor.api.ProcessorContext; + +public final class ProcessorShim implements Processor { Review comment: I think "adapter" is the standard design pattern name for this type of thing. Not sure why I thought "shim" was a good choice in the heat of the moment. Maybe because I'm kind of slipping these classes in the middle to make everything line up? I can change them to "adapter". ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -880,40 +880,41 @@ public synchronized ProcessorTopology buildGlobalStateTopology() { return globalGroups; } +@SuppressWarnings("unchecked") private ProcessorTopology build(final Set nodeGroup) {
[GitHub] [kafka] mjsax commented on pull request #9047: KAFKA-9274: Remove `retries` for global task
mjsax commented on pull request #9047: URL: https://github.com/apache/kafka/pull/9047#issuecomment-664773350 @vvcephei @abbccdda @guozhangwang @hachikuji -- I updated this PR according to your discussions. Needed to squash for rebasing to resolve conflicts. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively
abbccdda commented on a change in pull request #9081: URL: https://github.com/apache/kafka/pull/9081#discussion_r461313377 ## File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala ## @@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset") } + @Test(expected = classOf[TimeoutException]) + def testSendOffsetsToTransactionTimeout(): Unit = { +val producer = createTransactionalProducer("transactionProducer", maxBlockMs = 1000) +producer.initTransactions() +producer.beginTransaction() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foo".getBytes, "bar".getBytes)) + +for (i <- 0 until servers.size) Review comment: nit: could use servers.indices ## File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala ## @@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness { TestUtils.waitUntilTrue(() => offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot read committed offset") } + @Test(expected = classOf[TimeoutException]) + def testSendOffsetsToTransactionTimeout(): Unit = { +val producer = createTransactionalProducer("transactionProducer", maxBlockMs = 1000) +producer.initTransactions() +producer.beginTransaction() +producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, "foo".getBytes, "bar".getBytes)) + +for (i <- 0 until servers.size) + killBroker(i) + +try { + producer.sendOffsetsToTransaction(Map( Review comment: Do we have unit test coverage for other transaction API max blocking as well? Do you mind adding them as separate tests and share the same module? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively
abbccdda commented on pull request #9081: URL: https://github.com/apache/kafka/pull/9081#issuecomment-665157437 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda edited a comment on pull request #9012: KAFKA-10270: A broker to controller channel manager
abbccdda edited a comment on pull request #9012: URL: https://github.com/apache/kafka/pull/9012#issuecomment-665410633 Got 2/3 green, with one jenkins job terminated unexpectedly. https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7602/console This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman opened a new pull request #9094: KAFKA-10054: add TRACE-level e2e latency metrics
ableegoldman opened a new pull request #9094: URL: https://github.com/apache/kafka/pull/9094 Adds avg, min, and max e2e latency metrics at the new TRACE level. Also adds the missing `avg` task-level metric at the INFO level. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on pull request #9012: KAFKA-10270: A broker to controller channel manager
cmccabe commented on pull request #9012: URL: https://github.com/apache/kafka/pull/9012#issuecomment-665288947 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] abbccdda commented on a change in pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with adapters
abbccdda commented on a change in pull request #9004: URL: https://github.com/apache/kafka/pull/9004#discussion_r462057744 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java ## @@ -764,12 +764,12 @@ private void connectProcessorAndStateStore(final String processorName, if (!sourceTopics.isEmpty()) { stateStoreNameToSourceTopics.put(stateStoreName, -Collections.unmodifiableSet(sourceTopics)); + Collections.unmodifiableSet(sourceTopics)); Review comment: format looks weird, maybe just do 4 spaces ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java ## @@ -57,12 +57,12 @@ public StateStore getStateStore(final String name) { @SuppressWarnings("unchecked") @Override -public void forward(final K key, final V value) { -final ProcessorNode previousNode = currentNode(); +public void forward(final KIn key, final VIn value) { +final ProcessorNode previousNode = currentNode(); try { -for (final ProcessorNode child : currentNode().children()) { +for (final ProcessorNode child : currentNode().children()) { setCurrentNode(child); -((ProcessorNode) child).process(key, value); +((ProcessorNode) child).process(key, value); // FIXME Review comment: Could we leave a more clear comment on what needs to be fixed? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java ## @@ -0,0 +1,240 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor.api; + +import org.apache.kafka.common.header.Headers; +import org.apache.kafka.common.serialization.Serde; +import org.apache.kafka.streams.StreamsMetrics; +import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.Cancellable; +import org.apache.kafka.streams.processor.Processor; +import org.apache.kafka.streams.processor.PunctuationType; +import org.apache.kafka.streams.processor.Punctuator; +import org.apache.kafka.streams.processor.StateRestoreCallback; +import org.apache.kafka.streams.processor.StateStore; +import org.apache.kafka.streams.processor.TaskId; +import org.apache.kafka.streams.processor.TimestampExtractor; +import org.apache.kafka.streams.processor.To; + +import java.io.File; +import java.time.Duration; +import java.util.Map; + +/** + * Processor context interface. + * + * @param a bound on the types of keys that may be forwarded + * @param a bound on the types of values that may be forwarded + */ +public interface ProcessorContext { + +/** + * Returns the application id. + * + * @return the application id + */ +String applicationId(); + +/** + * Returns the task id. + * + * @return the task id + */ +TaskId taskId(); + +/** + * Returns the default key serde. + * + * @return the key serializer + */ +Serde keySerde(); + +/** + * Returns the default value serde. + * + * @return the value serializer + */ +Serde valueSerde(); + +/** + * Returns the state directory for the partition. + * + * @return the state directory + */ +File stateDir(); + +/** + * Returns Metrics instance. + * + * @return StreamsMetrics + */ +StreamsMetrics metrics(); + +/** + * Registers and possibly restores the specified storage engine. + * + * @param store the storage engine + * @param stateRestoreCallback the restoration callback logic for log-backed state stores upon restart + * + * @throws IllegalStateException If store gets registered after initialized is already finished + * @throws StreamsException if the store's change log does not contain the partition + */ +void register(final StateStore store, + final StateRestoreCallback stateRestoreCallback); + +/** + * Get the state store given the store name. + * + *
[GitHub] [kafka] abbccdda commented on pull request #9012: KAFKA-10270: A broker to controller channel manager
abbccdda commented on pull request #9012: URL: https://github.com/apache/kafka/pull/9012#issuecomment-665410633 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cadonna commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness
cadonna commented on pull request #9087: URL: https://github.com/apache/kafka/pull/9087#issuecomment-664621717 @abbccdda Looking at this test class again, I even think that we could set session timeout and heartbeat interval to default for all tests. The main motivation to reduce them was to not need to change the application ID for each test (see https://github.com/apache/kafka/pull/8530#discussion_r413170584). However, in the meanwhile each test has its own application ID, so I guess it would be fine to use again the defaults here. Is this correct @guozhangwang ? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…
chia7712 edited a comment on pull request #8657: URL: https://github.com/apache/kafka/pull/8657#issuecomment-665149860 ```streams_standby_replica_test``` -> https://issues.apache.org/jira/browse/KAFKA-10287 I will take a look at ```streams_broker_bounce_test``` (https://issues.apache.org/jira/browse/KAFKA-10292) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
vvcephei commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r461143961 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, final RecordConverter recordConverter) { for (final TopicPartition topicPartition : topicPartitions) { globalConsumer.assign(Collections.singletonList(topicPartition)); +long offset; final Long checkpoint = checkpointFileCache.get(topicPartition); if (checkpoint != null) { globalConsumer.seek(topicPartition, checkpoint); +offset = checkpoint; } else { globalConsumer.seekToBeginning(Collections.singletonList(topicPartition)); +final AtomicLong position = new AtomicLong(); +retryUntilSuccessOrThrowOnTaskTimeout( +() -> position.set(globalConsumer.position(topicPartition)), +String.format( +"Failed to get position for partition %s. The broker may be transiently unavailable at the moment.", +topicPartition +) +); +offset = position.get(); } -long offset = globalConsumer.position(topicPartition); final Long highWatermark = highWatermarks.get(topicPartition); final RecordBatchingStateRestoreCallback stateRestoreAdapter = StateRestoreCallbackAdapter.adapt(stateRestoreCallback); stateRestoreListener.onRestoreStart(topicPartition, storeName, offset, highWatermark); long restoreCount = 0L; +long deadlineMs = NO_DEADLINE; while (offset < highWatermark) { try { final ConsumerRecords records = globalConsumer.poll(pollTime); +if (records.isEmpty()) { +if (taskTimeoutMs == 0L) { +deadlineMs = maybeUpdateDeadlineOrThrow( Review comment: Ok, thanks @mjsax . I just traced though the consumer code again, and have finally been able to see what you already knew: that `request.timeout.ms` is indeed the correct amount of time to wait. Namely, we send a fetch here: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1285 Which calls through to client.send here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L263 Which fills in the `request.timeout.ms` config value here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L106 Which uses it to construct a ClientRequest here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L129-L130 Which then gets used to create an InFlightRequest when it gets sent here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1248 Which is later used to detect expired requests here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java#L162 Which is used to list nodes (brokers) for which there is an expired request here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java#L179 Which is then processed as a "disconnection" here: https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L803 It also looks like the KafkaClient just does a tight-loop checking for a network response, so we don't really need any extra time to account for sampling errors. Also, it still seems like using the sum as the poll duration is just as good as using your retry logic, so I think the duration parameter is fine. My only remaining question, which maybe doesn't really matter one way or another, is whether `poll.ms` really belongs here or not. It seems like the desired semantics are accomplished by just waiting `request.timeout.ms` for the initial failure, and then an extra `task.timeout.ms` for any retries. ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -292,21 +278,36
[GitHub] [kafka] vvcephei commented on pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with shims
vvcephei commented on pull request #9004: URL: https://github.com/apache/kafka/pull/9004#issuecomment-664740837 Thanks for the review, @abbccdda ! I've addressed your feedback. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9092: KAFKA-10163; Define `controller_mutation_rate` a Double instead of a Long
dajac commented on pull request #9092: URL: https://github.com/apache/kafka/pull/9092#issuecomment-664996150 @rajinisivaram I have updated an existing test to use 1.5 instead of 2 to verify this. https://github.com/apache/kafka/pull/9092/files#diff-296d7b93103356535b8b891f019e4651R113. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining
showuon commented on pull request #9062: URL: https://github.com/apache/kafka/pull/9062#issuecomment-664801932 hi @feyman2016 @huxihx , could you help review this small PR? Thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rgibaiev commented on pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null
rgibaiev commented on pull request #8575: URL: https://github.com/apache/kafka/pull/8575#issuecomment-665225469 Any update? This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] hachikuji commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task
hachikuji commented on a change in pull request #9047: URL: https://github.com/apache/kafka/pull/9047#discussion_r461114945 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java ## @@ -292,21 +278,36 @@ private void restoreState(final StateRestoreCallback stateRestoreCallback, while (offset < highWatermark) { try { -final ConsumerRecords records = globalConsumer.poll(pollTime); +final ConsumerRecords records = + globalConsumer.poll(pollTimePlusRequestTimeoutPlusTaskTimeout); Review comment: It does seem a bit weird here to add in the request timeout. Not sure I follow the reasoning behind that... This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] codelabor opened a new pull request #9090: toString pattern change like ProducerRecord.java
codelabor opened a new pull request #9090: URL: https://github.com/apache/kafka/pull/9090 In most of 'toString()' methods,'=' is used instead of ' = ', so I followed the convention. Actually, if you print 'ProducerRecord' and 'ConsumerRecord', the pattern looks different and looks strange. - producerRecord: ProducerRecord(topic=, partition=null, headers= ...) consumerRecord: ConsumerRecord(topic = , partition = 0, ..., headers = ...) *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #9089: KAFKA-10224: Update jersey license from CDDL to EPLv2
rhauch commented on pull request #9089: URL: https://github.com/apache/kafka/pull/9089#issuecomment-665168618 This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics
ableegoldman commented on pull request #9094: URL: https://github.com/apache/kafka/pull/9094#issuecomment-665394762 call for review @vvcephei @cadonna @guozhangwang This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] rhauch commented on pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work
rhauch commented on pull request #9051: URL: https://github.com/apache/kafka/pull/9051#issuecomment-665124035 This was also cherry-picked to `2.6`, but that branch has been frozen while we try to release AK 2.6.0. However, given that this is low-risk, I'll leave it on `2.6` and update [KAFKA-10268](https://issues.apache.org/jira/browse/KAFKA-10268) instead. @huxihx, next time please refrain from cherry-picking to frozen branches. Thanks! This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cmccabe commented on a change in pull request #9012: KAFKA-10270: A broker to controller channel manager
cmccabe commented on a change in pull request #9012: URL: https://github.com/apache/kafka/pull/9012#discussion_r461884961 ## File path: core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala ## @@ -0,0 +1,211 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package kafka.server + +import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit} + +import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler} +import kafka.network.RequestChannel +import kafka.utils.Logging +import org.apache.kafka.clients._ +import org.apache.kafka.common.requests.AbstractRequest +import org.apache.kafka.common.utils.{LogContext, Time} +import org.apache.kafka.common.Node +import org.apache.kafka.common.metrics.Metrics +import org.apache.kafka.common.network._ +import org.apache.kafka.common.protocol.Errors +import org.apache.kafka.common.requests.AbstractRequest.NoOpRequestBuilder +import org.apache.kafka.common.security.JaasContext + +import scala.collection.mutable +import scala.jdk.CollectionConverters._ + +/** + * This class manages the connection between a broker and the controller. It runs a single + * {@link BrokerToControllerRequestThread} which uses the broker's metadata cache as its own metadata to find + * and connect to the controller. The channel is async and runs the network connection in the background. + * The maximum number of in-flight requests are set to one to ensure orderly response from the controller, therefore + * care must be taken to not block on outstanding requests for too long. + */ +class BrokerToControllerChannelManager(metadataCache: kafka.server.MetadataCache, + time: Time, + metrics: Metrics, + config: KafkaConfig, + threadNamePrefix: Option[String] = None) extends Logging { + private val requestQueue = new LinkedBlockingQueue[BrokerToControllerQueueItem] + private val logContext = new LogContext(s"[broker-${config.brokerId}-to-controller] ") + private val manualMetadataUpdater = new ManualMetadataUpdater() + private val requestThread = newRequestThread + + def start(): Unit = { +requestThread.start() + } + + def shutdown(): Unit = { +requestThread.shutdown() +requestThread.awaitShutdown() + } + + private[server] def newRequestThread = { +val brokerToControllerListenerName = config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName) +val brokerToControllerSecurityProtocol = config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol) + +val networkClient = { + val channelBuilder = ChannelBuilders.clientChannelBuilder( +brokerToControllerSecurityProtocol, +JaasContext.Type.SERVER, +config, +brokerToControllerListenerName, +config.saslMechanismInterBrokerProtocol, +time, +config.saslInterBrokerHandshakeRequestEnable, +logContext + ) + val selector = new Selector( +NetworkReceive.UNLIMITED, +Selector.NO_IDLE_TIMEOUT_MS, +metrics, +time, +"BrokerToControllerChannel", +Map("BrokerId" -> config.brokerId.toString).asJava, +false, +channelBuilder, +logContext + ) + new NetworkClient( +selector, +manualMetadataUpdater, +config.brokerId.toString, +1, +0, +0, +Selectable.USE_DEFAULT_BUFFER_SIZE, +Selectable.USE_DEFAULT_BUFFER_SIZE, +config.requestTimeoutMs, +config.connectionSetupTimeoutMs, +config.connectionSetupTimeoutMaxMs, +ClientDnsLookup.DEFAULT, Review comment: @abbccdda : Please switch this to `ClientDnsLookup.USE_ALL_DNS_IPS` to be consistent with the other NetworkClients, such as the one in `ControllerChannelManager`, etc. ## File path: core/src/main/scala/kafka/server/KafkaServer.scala ## @@ -168,6 +168,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = Time.SYSTEM, threadNameP var kafkaController: KafkaController = null + var
[GitHub] [kafka] mjsax commented on pull request #9086: FIX: Remove staticmethod tag to be able to use logger of instance
mjsax commented on pull request #9086: URL: https://github.com/apache/kafka/pull/9086#issuecomment-664684265 Merged to `trunk` and cherry-picked to `2.6`, `2.5`, `2.4`, and `2.3` branches. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] huxihx edited a comment on pull request #8984: KAFKA-10227: Enforce cleanup policy to only contain compact or delete once
huxihx edited a comment on pull request #8984: URL: https://github.com/apache/kafka/pull/8984#issuecomment-664804260 Ping @omkreddy @mimaison @dajac for review. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9091: MINOR; Make KafkaAdminClientTest.testDescribeLogDirsPartialFailure and KafkaAdminClientTest.testAlterReplicaLogDirsPartialFailure test more reli
dajac commented on pull request #9091: URL: https://github.com/apache/kafka/pull/9091#issuecomment-664848664 @chia7712 Ack. I reverted `testMetadataRetries` back to the original version prior to #8864. That test is specific for the retries so it doesn't make sense to remove the retries config. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] cyrusv opened a new pull request #9093: Throw error on when keys not found in FileConfigProvider
cyrusv opened a new pull request #9093: URL: https://github.com/apache/kafka/pull/9093 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] sasakitoa commented on pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively
sasakitoa commented on pull request #9081: URL: https://github.com/apache/kafka/pull/9081#issuecomment-664725165 Thank you for kindness comments. I updated to improve java doc and related test code. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()
tombentley commented on pull request #9007: URL: https://github.com/apache/kafka/pull/9007#issuecomment-664836543 @mimaison done, thanks. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #9072: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)
dajac commented on pull request #9072: URL: https://github.com/apache/kafka/pull/9072#issuecomment-664923691 @apovzner Thanks for your comments. I have updated the PR to incorporate your feedback. I have also fixed a bug in unrecord and added the javadoc. This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org