[jira] [Updated] (KAFKA-10181) Redirect AlterConfig/IncrementalAlterConfig to the controller

2020-07-29 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10181:

Description: In the new Admin client, the 
AlterConfig/IncrementalAlterConfig request should be redirected to the active 
controller.  (was: In the new Admin client, the request should always be routed 
towards the controller.)

> Redirect AlterConfig/IncrementalAlterConfig to the controller
> -
>
> Key: KAFKA-10181
> URL: https://issues.apache.org/jira/browse/KAFKA-10181
> Project: Kafka
>  Issue Type: Sub-task
>  Components: admin
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.7.0
>
>
> In the new Admin client, the AlterConfig/IncrementalAlterConfig request 
> should be redirected to the active controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10181) Redirect AlterConfig/IncrementalAlterConfig to the controller

2020-07-29 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10181?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10181:

Summary: Redirect AlterConfig/IncrementalAlterConfig to the controller  
(was: AlterConfig/IncrementalAlterConfig should go to controller)

> Redirect AlterConfig/IncrementalAlterConfig to the controller
> -
>
> Key: KAFKA-10181
> URL: https://issues.apache.org/jira/browse/KAFKA-10181
> Project: Kafka
>  Issue Type: Sub-task
>  Components: admin
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.7.0
>
>
> In the new Admin client, the request should always be routed towards the 
> controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10270) Add a broker to controller channel manager to redirect AlterConfig

2020-07-29 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10270:

Fix Version/s: 2.7.0

> Add a broker to controller channel manager to redirect AlterConfig
> --
>
> Key: KAFKA-10270
> URL: https://issues.apache.org/jira/browse/KAFKA-10270
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
> Fix For: 2.7.0
>
>
> Per KIP-590 requirement, we need to have a dedicate communication channel 
> from broker to the controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10270) Add a broker to controller channel manager to redirect AlterConfig

2020-07-29 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10270?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen resolved KAFKA-10270.
-
Resolution: Fixed

> Add a broker to controller channel manager to redirect AlterConfig
> --
>
> Key: KAFKA-10270
> URL: https://issues.apache.org/jira/browse/KAFKA-10270
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> Per KIP-590 requirement, we need to have a dedicate communication channel 
> from broker to the controller.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10326) Both serializer and deserializer should be able to see the generated client id

2020-07-29 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-10326:
--

 Summary: Both serializer and deserializer should be able to see 
the generated client id
 Key: KAFKA-10326
 URL: https://issues.apache.org/jira/browse/KAFKA-10326
 Project: Kafka
  Issue Type: Bug
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


Producer and consumer generate client id when users don't define it. the 
generated client id is passed to all configurable components (for example, 
metrics reporter) except for serializer/deseriaizer.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10325) Implement KIP-649: Dynamic Client Configuration

2020-07-29 Thread Ryan Dielhenn (Jira)
Ryan Dielhenn created KAFKA-10325:
-

 Summary: Implement KIP-649: Dynamic Client Configuration
 Key: KAFKA-10325
 URL: https://issues.apache.org/jira/browse/KAFKA-10325
 Project: Kafka
  Issue Type: New Feature
Reporter: Ryan Dielhenn


Implement KIP-649: Dynamic Client Configuration



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167488#comment-17167488
 ] 

Ismael Juma commented on KAFKA-10324:
-

Yeah, it's usually ok to have one smaller batch per segment. And in the common 
case where you are reading from the end of the log, there may not be another 
segment. Scanning ahead seems ok for down conversion, but it's not clear if 
it's worth it for the common case (there is a cost to scanning ahead).

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10307) Topology cycles in KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2020-07-29 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10307?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167487#comment-17167487
 ] 

John Roesler commented on KAFKA-10307:
--

Hey [~bchen225242] , it seems like this was just a misunderstanding. Can we 
close the ticket? Or have I missed something?

> Topology cycles in 
> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> -
>
> Key: KAFKA-10307
> URL: https://issues.apache.org/jira/browse/KAFKA-10307
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0, 2.5.0, 2.6.0
>Reporter: Boyang Chen
>Priority: Major
>
> We have spotted a cycled topology for the foreign-key join test 
> *shouldInnerJoinMultiPartitionQueryable*, not sure yet whether this is a bug 
> in the algorithm or the test only. Used 
> [https://zz85.github.io/kafka-streams-viz/] to visualize:
> {code:java}
> Sub-topology: 0
> Source: KTABLE-SOURCE-19 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Source: KTABLE-SOURCE-32 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-30-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Source: KSTREAM-SOURCE-01 (topics: [table1])
>   --> KTABLE-SOURCE-02
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-OUTPUT-21
>   <-- KTABLE-SOURCE-19
> Processor: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33 (stores: 
> [INNER-store1])
>   --> KTABLE-FK-JOIN-OUTPUT-34
>   <-- KTABLE-SOURCE-32
> Processor: KTABLE-FK-JOIN-OUTPUT-21 (stores: [INNER-store1])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-20
> Processor: KTABLE-FK-JOIN-OUTPUT-34 (stores: [INNER-store2])
>   --> KTABLE-TOSTREAM-35
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-RESOLVER-PROCESSOR-33
> Processor: KTABLE-SOURCE-02 (stores: 
> [table1-STATE-STORE-00])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
>   <-- KSTREAM-SOURCE-01
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10 (stores: 
> [])
>   --> KTABLE-SINK-11
>   <-- KTABLE-SOURCE-02
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23 (stores: 
> [])
>   --> KTABLE-SINK-24
>   <-- KTABLE-FK-JOIN-OUTPUT-21
> Processor: KTABLE-TOSTREAM-35 (stores: [])
>   --> KSTREAM-SINK-36
>   <-- KTABLE-FK-JOIN-OUTPUT-34
> Sink: KSTREAM-SINK-36 (topic: output-)
>   <-- KTABLE-TOSTREAM-35
> Sink: KTABLE-SINK-11 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-10
> Sink: KTABLE-SINK-24 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-22-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-23  Sub-topology: 1
> Source: KSTREAM-SOURCE-04 (topics: [table2])
>   --> KTABLE-SOURCE-05
> Source: KTABLE-SOURCE-12 (topics: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-REGISTRATION-09-topic])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15
>   <-- KTABLE-SOURCE-12
> Processor: KTABLE-SOURCE-05 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16
>   <-- KSTREAM-SOURCE-04
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15 (stores: 
> [table2-STATE-STORE-03])
>   --> KTABLE-SINK-18
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-14
> Processor: KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16 (stores: 
> [KTABLE-FK-JOIN-SUBSCRIPTION-STATE-STORE-13])
>   --> KTABLE-SINK-18
>   <-- KTABLE-SOURCE-05
> Sink: KTABLE-SINK-18 (topic: 
> KTABLE-FK-JOIN-SUBSCRIPTION-RESPONSE-17-topic)
>   <-- KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-15, 
> KTABLE-FK-JOIN-SUBSCRIPTION-PROCESSOR-16  Sub-topology: 2
> Source: KSTREAM-SOURCE-07 (topics: [table3])
>   --> KTABLE-SOURCE-08
> 

[jira] [Updated] (KAFKA-3672) Introduce globally consistent checkpoint in Kafka Streams

2020-07-29 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3672?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-3672:

Issue Type: New Feature  (was: Bug)

> Introduce globally consistent checkpoint in Kafka Streams
> -
>
> Key: KAFKA-3672
> URL: https://issues.apache.org/jira/browse/KAFKA-3672
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Affects Versions: 0.10.0.0
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: user-experience
>
> This is originate from the idea of rethinking about the checkpoint file 
> creation condition:
> Today the checkpoint file containing the checkpointed offsets is written upon 
> stream task clean shutdown, and is read and deleted upon stream task 
> (re-)construction. The rationale is that if upon task re-construction, the 
> checkpoint file is missing, it indicates that the underlying persistent state 
> store (rocksDB, for example)'s state may not be consistent with the committed 
> offsets, and hence we'd better to wipe-out the maybe-broken state storage and 
> rebuild from the beginning of the offset.
> However, we may able to do better than this setting if we can fully control 
> the persistent store flushing time to be aligned with committing, and hence 
> as long as we commit, we are always guaranteed to get a clear checkpoint.
> This may be generalized to a "global state checkpoint" mechanism in Kafka 
> Streams, which may also subsume KAFKA-3184 for non persistent stores.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167485#comment-17167485
 ] 

Tommy Becker commented on KAFKA-10324:
--

Understood but seems like as your fetch offset approaches the end of a segment 
you'll get a smaller batch. Just pointing out that scanning ahead to grab as 
many records as will fit into the max fetch size could be a general, albeit 
small improvement across the board.

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9210) kafka stream loss data

2020-07-29 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9210?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-9210.
-
Resolution: Fixed

> kafka stream loss data
> --
>
> Key: KAFKA-9210
> URL: https://issues.apache.org/jira/browse/KAFKA-9210
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.0.1
>Reporter: panpan.liu
>Priority: Major
> Attachments: app.log, screenshot-1.png
>
>
> kafka broker: 2.0.1
> kafka stream client: 2.1.0
>  # two applications run at the same time
>  # after some days,I stop one application(in k8s)
>  # The flollowing log occured and I check the data and find that value is 
> less than what I expected.
> {quote}Partitions 
> [flash-app-xmc-worker-share-store-minute-repartition-1]Partitions 
> [flash-app-xmc-worker-share-store-minute-repartition-1] for changelog 
> flash-app-xmc-worker-share-store-minute-changelog-12019-11-19 
> 05:50:49.816|WARN 
> |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|101|stream-thread
>  [flash-client-xmc-StreamThread-3] Restoring StreamTasks failed. Deleting 
> StreamTasks stores to recreate from 
> scratch.org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets 
> out of range with no configured reset policy for partitions: 
> \{flash-app-xmc-worker-share-store-minute-changelog-1=6128684} at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:987)2019-11-19
>  05:50:49.817|INFO 
> |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|105|stream-thread
>  [flash-client-xmc-StreamThread-3] Reinitializing StreamTask TaskId: 10_1 
> ProcessorTopology: KSTREAM-SOURCE-70: topics: 
> [flash-app-xmc-worker-share-store-minute-repartition] children: 
> [KSTREAM-AGGREGATE-67] KSTREAM-AGGREGATE-67: states: 
> [worker-share-store-minute] children: [KTABLE-TOSTREAM-71] 
> KTABLE-TOSTREAM-71: children: [KSTREAM-SINK-72] 
> KSTREAM-SINK-72: topic: 
> StaticTopicNameExtractor(xmc-worker-share-minute)Partitions 
> [flash-app-xmc-worker-share-store-minute-repartition-1] for changelog 
> flash-app-xmc-worker-share-store-minute-changelog-12019-11-19 
> 05:50:49.842|WARN 
> |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|101|stream-thread
>  [flash-client-xmc-StreamThread-3] Restoring StreamTasks failed. Deleting 
> StreamTasks stores to recreate from 
> scratch.org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets 
> out of range with no configured reset policy for partitions: 
> \{flash-app-xmc-worker-share-store-minute-changelog-1=6128684} at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:987)2019-11-19
>  05:50:49.842|INFO 
> |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|105|stream-thread
>  [flash-client-xmc-StreamThread-3] Reinitializing StreamTask TaskId: 10_1 
> ProcessorTopology: KSTREAM-SOURCE-70: topics: 
> [flash-app-xmc-worker-share-store-minute-repartition] children: 
> [KSTREAM-AGGREGATE-67] KSTREAM-AGGREGATE-67: states: 
> [worker-share-store-minute] children: [KTABLE-TOSTREAM-71] 
> KTABLE-TOSTREAM-71: children: [KSTREAM-SINK-72] 
> KSTREAM-SINK-72: topic: 
> StaticTopicNameExtractor(xmc-worker-share-minute)Partitions 
> [flash-app-xmc-worker-share-store-minute-repartition-1] for changelog 
> flash-app-xmc-worker-share-store-minute-changelog-12019-11-19 
> 05:50:49.905|WARN 
> |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|101|stream-thread
>  [flash-client-xmc-StreamThread-3] Restoring StreamTasks failed. Deleting 
> StreamTasks stores to recreate from 
> scratch.org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets 
> out of range with no configured reset policy for partitions: 
> \{flash-app-xmc-worker-share-store-minute-changelog-1=6128684} at 
> org.apache.kafka.clients.consumer.internals.Fetcher.parseCompletedFetch(Fetcher.java:987)2019-11-19
>  05:50:49.906|INFO 
> |flash-client-xmc-StreamThread-3|o.a.k.s.p.i.StoreChangelogReader|105|stream-thread
>  [flash-client-xmc-StreamThread-3] Reinitializing StreamTask TaskId: 10_1 
> ProcessorTopology: KSTREAM-SOURCE-70: topics: 
> [flash-app-xmc-worker-share-store-minute-repartition] children: 
> [KSTREAM-AGGREGATE-67] KSTREAM-AGGREGATE-67: states: 
> [worker-share-store-minute] 
> {quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167481#comment-17167481
 ] 

Ismael Juma commented on KAFKA-10324:
-

Segments generally are larger than the fetch response size, so it's usually not 
an issue.

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167481#comment-17167481
 ] 

Ismael Juma edited comment on KAFKA-10324 at 7/29/20, 8:00 PM:
---

Segments generally are typically significantly larger than the fetch response 
size, so it's usually not an issue.


was (Author: ijuma):
Segments generally are larger than the fetch response size, so it's usually not 
an issue.

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167480#comment-17167480
 ] 

Tommy Becker commented on KAFKA-10324:
--

So is it always the case that records returned in a FetchResponse are all from 
a single segment? Might that not be inefficient?

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167444#comment-17167444
 ] 

Jason Gustafson commented on KAFKA-10324:
-

Thanks. Unfortunately it's not super straightforward to fix, but we have some 
ideas. Basically we have to modify the down-conversion logic so that it can 
read across multiple segments until it finds a non-empty batch to return.

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167431#comment-17167431
 ] 

Tommy Becker commented on KAFKA-10324:
--

Yes. See my comment above.

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167413#comment-17167413
 ] 

Jason Gustafson commented on KAFKA-10324:
-

Aha, it is because it is at the end of the segment. Was that true in the other 
cases?

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #9012: KAFKA-10270: A broker to controller channel manager

2020-07-29 Thread GitBox


cmccabe commented on a change in pull request #9012:
URL: https://github.com/apache/kafka/pull/9012#discussion_r462385360



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{LinkedBlockingDeque, TimeUnit}
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import kafka.utils.Logging
+import org.apache.kafka.clients._
+import org.apache.kafka.common.requests.AbstractRequest
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.security.JaasContext
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class manages the connection between a broker and the controller. It 
runs a single
+ * {@link BrokerToControllerRequestThread} which uses the broker's metadata 
cache as its own metadata to find
+ * and connect to the controller. The channel is async and runs the network 
connection in the background.
+ * The maximum number of in-flight requests are set to one to ensure orderly 
response from the controller, therefore
+ * care must be taken to not block on outstanding requests for too long.
+ */
+class BrokerToControllerChannelManager(metadataCache: 
kafka.server.MetadataCache,
+   time: Time,
+   metrics: Metrics,
+   config: KafkaConfig,
+   threadNamePrefix: Option[String] = 
None) extends Logging {
+  private val requestQueue = new 
LinkedBlockingDeque[BrokerToControllerQueueItem]
+  private val logContext = new 
LogContext(s"[broker-${config.brokerId}-to-controller] ")
+  private val manualMetadataUpdater = new ManualMetadataUpdater()
+  private val requestThread = newRequestThread
+
+  def start(): Unit = {
+requestThread.start()
+  }
+
+  def shutdown(): Unit = {
+requestThread.shutdown()
+requestThread.awaitShutdown()
+  }
+
+  private[server] def newRequestThread = {
+val brokerToControllerListenerName = 
config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
+val brokerToControllerSecurityProtocol = 
config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
+
+val networkClient = {
+  val channelBuilder = ChannelBuilders.clientChannelBuilder(
+brokerToControllerSecurityProtocol,
+JaasContext.Type.SERVER,
+config,
+brokerToControllerListenerName,
+config.saslMechanismInterBrokerProtocol,
+time,
+config.saslInterBrokerHandshakeRequestEnable,
+logContext
+  )
+  val selector = new Selector(
+NetworkReceive.UNLIMITED,
+Selector.NO_IDLE_TIMEOUT_MS,
+metrics,
+time,
+"BrokerToControllerChannel",
+Map("BrokerId" -> config.brokerId.toString).asJava,
+false,
+channelBuilder,
+logContext
+  )
+  new NetworkClient(
+selector,
+manualMetadataUpdater,
+config.brokerId.toString,
+1,
+0,
+0,
+Selectable.USE_DEFAULT_BUFFER_SIZE,
+Selectable.USE_DEFAULT_BUFFER_SIZE,
+config.requestTimeoutMs,
+config.connectionSetupTimeoutMs,
+config.connectionSetupTimeoutMaxMs,
+ClientDnsLookup.USE_ALL_DNS_IPS,
+time,
+false,
+new ApiVersions,
+logContext
+  )
+}
+val threadName = threadNamePrefix match {
+  case None => s"broker-${config.brokerId}-to-controller-send-thread"
+  case Some(name) => 
s"$name:broker-${config.brokerId}-to-controller-send-thread"
+}
+
+new BrokerToControllerRequestThread(networkClient, manualMetadataUpdater, 
requestQueue, metadataCache, config,
+  brokerToControllerListenerName, time, threadName)
+  }
+
+  private[server] def sendRequest(request: AbstractRequest.Builder[_ <: 

[GitHub] [kafka] chia7712 commented on pull request #9097: KAFKA-10319: Skip unknown offsets when computing sum of changelog offsets

2020-07-29 Thread GitBox


chia7712 commented on pull request #9097:
URL: https://github.com/apache/kafka/pull/9097#issuecomment-665504521







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna opened a new pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

2020-07-29 Thread GitBox


cadonna opened a new pull request #9098:
URL: https://github.com/apache/kafka/pull/9098


   This PR refactors the RocksDB store and the metrics infrastructure in 
Streams 
   in preparation of the recordings of the RocksDB properties specified in 
KIP-607.
   
   The refactoring includes:
   - wrapper around `BlockedBasedTableConfig` to make the cache accessible to 
the 
  RocksDB metrics recorder
   - RocksDB metrics recorder now takes also the DB instance and the cache in 
addition
  to the statistics
   - The value providers for the metrics are added to the RockDB metrics 
recorder also if
  the recording level is INFO.
   - The creation of the RocksDB metrics recording trigger is moved to 
`StreamsMetricsImpl`   
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] badaiaqrandista edited a comment on pull request #4807: KAFKA-6733: Support of printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-07-29 Thread GitBox


badaiaqrandista edited a comment on pull request #4807:
URL: https://github.com/apache/kafka/pull/4807#issuecomment-665660018


   This PR should be closed as it has been superseded by PR 9099 
(https://github.com/apache/kafka/pull/9099).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167400#comment-17167400
 ] 

Tommy Becker edited comment on KAFKA-10324 at 7/29/20, 5:54 PM:


[~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm 
not sure. But I can tell you I see this behavior even with 
max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to 
do with down conversion?  Anyway, here's an excerpt from a dump of the segment 
containing the problematic offset, which is 13920987:

 

baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 
lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 
isTransactional: false isControl: false position: 98516844 CreateTime: 
1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: 
true
\| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
\| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
\| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
\| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
\| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
\| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
End of segment is here

 

 


was (Author: twbecker):
[~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm 
not sure. But I can tell you I see this behavior even with 
max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to 
do with down conversion?  Anyway, here's an excerpt from a dump of the segment 
containing the problematic offset, which is 13920987:

 

baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 
lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 
isTransactional: false isControl: false position: 98516844 CreateTime: 
1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: 
true
| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
End of segment is here

 

 

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness

2020-07-29 Thread GitBox


cadonna commented on pull request #9087:
URL: https://github.com/apache/kafka/pull/9087#issuecomment-665526207


   I did some runs of the `RestoreIntegrationTest` and the runtime does not 
significantly change between runs with default values for session timeout and 
heartbeat interval and reduced values for session timeout and heartbeat 
interval. Hence, I set the two configs to their default values for the entire 
test.  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9092: KAFKA-10163; Define `controller_mutation_rate` as a Double instead of a Long

2020-07-29 Thread GitBox


chia7712 commented on a change in pull request #9092:
URL: https://github.com/apache/kafka/pull/9092#discussion_r462394461



##
File path: core/src/main/scala/kafka/server/DynamicConfig.scala
##
@@ -103,7 +103,7 @@ object DynamicConfig {
   .define(ProducerByteRateOverrideProp, LONG, DefaultProducerOverride, 
MEDIUM, ProducerOverrideDoc)
   .define(ConsumerByteRateOverrideProp, LONG, DefaultConsumerOverride, 
MEDIUM, ConsumerOverrideDoc)
   .define(RequestPercentageOverrideProp, DOUBLE, DefaultRequestOverride, 
MEDIUM, RequestOverrideDoc)
-  .define(ControllerMutationOverrideProp, LONG, DefaultConsumerOverride, 
MEDIUM, ControllerMutationOverrideDoc)
+  .define(ControllerMutationOverrideProp, DOUBLE, DefaultConsumerOverride, 
MEDIUM, ControllerMutationOverrideDoc)

Review comment:
   why not changing the type of default value from ```Long``` to 
```Double```?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-07-29 Thread GitBox


dajac commented on a change in pull request #9099:
URL: https://github.com/apache/kafka/pull/9099#discussion_r462306541



##
File path: core/src/main/scala/kafka/tools/ConsoleConsumer.scala
##
@@ -459,48 +466,32 @@ class DefaultMessageFormatter extends MessageFormatter {
   var printKey = false
   var printValue = true
   var printPartition = false
-  var keySeparator = "\t".getBytes(StandardCharsets.UTF_8)
-  var lineSeparator = "\n".getBytes(StandardCharsets.UTF_8)
+  var printOffset = false
+  var printHeaders = false
+  var keySeparator = utfBytes("\t")
+  var lineSeparator = utfBytes("\n")
+  var headersSeparator = utfBytes(",")
+  var nullLiteral = utfBytes("null")
 
   var keyDeserializer: Option[Deserializer[_]] = None
   var valueDeserializer: Option[Deserializer[_]] = None
-
-  override def configure(configs: Map[String, _]): Unit = {
-val props = new java.util.Properties()
-configs.asScala.foreach { case (key, value) => props.put(key, 
value.toString) }
-if (props.containsKey("print.timestamp"))
-  printTimestamp = 
props.getProperty("print.timestamp").trim.equalsIgnoreCase("true")
-if (props.containsKey("print.key"))
-  printKey = props.getProperty("print.key").trim.equalsIgnoreCase("true")
-if (props.containsKey("print.value"))
-  printValue = 
props.getProperty("print.value").trim.equalsIgnoreCase("true")
-if (props.containsKey("print.partition"))
-  printPartition = 
props.getProperty("print.partition").trim.equalsIgnoreCase("true")
-if (props.containsKey("key.separator"))
-  keySeparator = 
props.getProperty("key.separator").getBytes(StandardCharsets.UTF_8)
-if (props.containsKey("line.separator"))
-  lineSeparator = 
props.getProperty("line.separator").getBytes(StandardCharsets.UTF_8)
-// Note that `toString` will be called on the instance returned by 
`Deserializer.deserialize`
-if (props.containsKey("key.deserializer")) {
-  keyDeserializer = 
Some(Class.forName(props.getProperty("key.deserializer")).getDeclaredConstructor()
-.newInstance().asInstanceOf[Deserializer[_]])
-  
keyDeserializer.get.configure(propertiesWithKeyPrefixStripped("key.deserializer.",
 props).asScala.asJava, true)
-}
-// Note that `toString` will be called on the instance returned by 
`Deserializer.deserialize`
-if (props.containsKey("value.deserializer")) {
-  valueDeserializer = 
Some(Class.forName(props.getProperty("value.deserializer")).getDeclaredConstructor()
-.newInstance().asInstanceOf[Deserializer[_]])
-  
valueDeserializer.get.configure(propertiesWithKeyPrefixStripped("value.deserializer.",
 props).asScala.asJava, false)
-}
-  }
-
-  private def propertiesWithKeyPrefixStripped(prefix: String, props: 
Properties): Properties = {
-val newProps = new Properties()
-props.asScala.foreach { case (key, value) =>
-  if (key.startsWith(prefix) && key.length > prefix.length)
-newProps.put(key.substring(prefix.length), value)
-}
-newProps
+  var headersDeserializer: Option[Deserializer[_]] = None
+
+  override def init(props: Properties): Unit = {

Review comment:
   `init(props: Properties)` has been deprecated. It would be great if we 
could keep using `configure(configs: Map[String, _])` as before. I think that 
we should also try to directly extract the values from the `Map` instead of 
using a `Properties`.

##
File path: core/src/test/scala/kafka/tools/DefaultMessageFormatterTest.scala
##
@@ -0,0 +1,235 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  * http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package unit.kafka.tools
+
+import java.io.{ByteArrayOutputStream, Closeable, PrintStream}
+import java.nio.charset.StandardCharsets
+import java.util
+import java.util.Properties
+
+import kafka.tools.DefaultMessageFormatter
+import org.apache.kafka.clients.consumer.ConsumerRecord
+import org.apache.kafka.common.header.Header
+import org.apache.kafka.common.header.internals.{RecordHeader, RecordHeaders}
+import org.apache.kafka.common.record.TimestampType
+import org.apache.kafka.common.serialization.Deserializer
+import org.junit.Assert._
+import org.junit.Test
+import org.junit.runner.RunWith
+import 

[GitHub] [kafka] cadonna commented on pull request #9098: KAFKA-9924: Prepare RocksDB and metrics for RocksDB properties recording

2020-07-29 Thread GitBox


cadonna commented on pull request #9098:
URL: https://github.com/apache/kafka/pull/9098#issuecomment-665616943


   Call for review: @vvcephei @guozhangwang 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167400#comment-17167400
 ] 

Tommy Becker edited comment on KAFKA-10324 at 7/29/20, 5:46 PM:


[~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm 
not sure. But I can tell you I see this behavior even with 
max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to 
do with down conversion?  Anyway, here's an excerpt from a dump of the segment 
containing the problematic offset, which is 13920987:

 

baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 
lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 
isTransactional: false isControl: false position: 98516844 CreateTime: 
1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: 
true
| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
End of segment is here

 

 


was (Author: twbecker):
[~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm 
not sure. But I can tell you I see this behavior even with 
max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to 
do with down conversion?  Anyway, here's an excerpt from a dump of the segment 
containing the problematic offset, which is 13920987:

 

baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 
lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 
isTransactional: false isControl: false position: 98516844 CreateTime: 
1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: 
true
| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []

### End of segment is here

 

 

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mumrah commented on pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-29 Thread GitBox


mumrah commented on pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#issuecomment-665795947


   I ran a console consumer perf test (at @hachikuji's suggestion) and took a 
profile. 
   
   
![image](https://user-images.githubusercontent.com/55116/88832229-81be6d00-d19e-11ea-9ee9-51b6054a6731.png)
   
   Zoomed in a bit on the records part:
   
   
![image](https://user-images.githubusercontent.com/55116/88832276-93a01000-d19e-11ea-9293-a138c38f6ed3.png)
   
   This was with only a handful of partitions on a single broker (on my 
laptop), but it confirms that the new FetchResponse serialization is hitting 
the same sendfile path as the previous code.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167400#comment-17167400
 ] 

Tommy Becker commented on KAFKA-10324:
--

[~hachikuji] wrt why the broker does not seem to send subsequent batches, I'm 
not sure. But I can tell you I see this behavior even with 
max.partition.fetch.bytes set to Integer.MAX_VALUE. Maybe this has something to 
do with down conversion?  Anyway, here's an excerpt from a dump of the segment 
containing the problematic offset, which is 13920987:

 

baseOffset: 13920966 lastOffset: 13920987 count: 6 baseSequence: -1 
lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 49 
isTransactional: false isControl: false position: 98516844 CreateTime: 
1595224747691 size: 4407 magic: 2 compresscodec: NONE crc: 1598305187 isvalid: 
true
| offset: 13920978 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920979 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920980 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []
| offset: 13920984 CreateTime: 1595224747691 keysize: 36 valuesize: 681 
sequence: -1 headerKeys: []
| offset: 13920985 CreateTime: 1595224747691 keysize: 36 valuesize: 677 
sequence: -1 headerKeys: []
| offset: 13920986 CreateTime: 1595224747691 keysize: 36 valuesize: 680 
sequence: -1 headerKeys: []

### End of segment is here

 

 

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-29 Thread GitBox


cadonna commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r462327050



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -227,6 +234,14 @@ protected Bytes keyBytes(final K key) {
 return byteEntries;
 }
 
+private void maybeRecordE2ELatency() {
+if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {

Review comment:
   I think, you do not need to check for metrics with 
`e2eLatencySensor.hasMetrics()`. There should always be metrics within this 
sensor.  
   `hasMetrics()` is used in `StreamsMetricsImpl#maybeMeasureLatency()` because 
some sensors may not contain any metrics due to the built-in metrics version. 
For instance, the destroy sensor exists for built-in metrics version 0.10.0-2.4 
but not for latest. To avoid version checks in the record processing code, we 
just create an empty sensor and call record on it effectively not recording any 
metrics for this sensor for version latest.
   We do not hide newly added metrics if the built-in version is set to an 
older version.
   Same applies to the other uses of `hasMetrics()` introduced in this PR.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##
@@ -248,4 +253,12 @@ public void close() {
 private Bytes keyBytes(final K key) {
 return Bytes.wrap(serdes.rawKey(key));
 }
+
+private void maybeRecordE2ELatency() {
+if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {

Review comment:
   Your approach makes sense to me. I agree that the latency should refer 
to the update in the state store and not to record itself. If a record updates 
the state more than once then latency should be measured each time. 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/metrics/StateStoreMetrics.java
##
@@ -443,6 +447,25 @@ public static Sensor suppressionBufferSizeSensor(final 
String threadId,
 );
 }
 
+public static Sensor e2ELatencySensor(final String threadId,
+  final String taskId,
+  final String storeType,
+  final String storeName,
+  final StreamsMetricsImpl 
streamsMetrics) {
+final Sensor sensor = streamsMetrics.storeLevelSensor(threadId, 
taskId, storeName, RECORD_E2E_LATENCY, RecordingLevel.TRACE);
+final Map tagMap = 
streamsMetrics.storeLevelTagMap(threadId, taskId, storeType, storeName);
+addAvgAndMinAndMaxToSensor(
+sensor,
+STATE_STORE_LEVEL_GROUP,

Review comment:
   You need to use the `stateStoreLevelGroup()` here instead of 
`STATE_STORE_LEVEL_GROUP` because the group name depends on the version and the 
store type.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##
@@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String 
group0100To24,
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, 
expectedNumberofE2ELatencyMetrics);

Review comment:
   I agree with you, it should always be 1. It is the group of the metrics. 
See my comment in `StateStoreMetrics`. I am glad this test served its purpose, 
because I did not notice this in the unit tests! 

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamTaskTest.java
##
@@ -468,38 +468,44 @@ public void 
shouldRecordE2ELatencyOnProcessForSourceNodes() {
 }
 
 @Test
-public void shouldRecordE2ELatencyMinAndMax() {
+public void shouldRecordE2ELatencyAvgAndMinAndMax() {
 time = new MockTime(0L, 0L, 0L);
 metrics = new Metrics(new 
MetricConfig().recordLevel(Sensor.RecordingLevel.INFO), time);
 task = createStatelessTask(createConfig(false, "0"), 
StreamsConfig.METRICS_LATEST);
 
 final String sourceNode = source1.name();
 
-final Metric maxMetric = getProcessorMetric("record-e2e-latency", 
"%s-max", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST);
+final Metric avgMetric = getProcessorMetric("record-e2e-latency", 
"%s-avg", task.id().toString(), sourceNode, StreamsConfig.METRICS_LATEST);
 final Metric minMetric = getProcessorMetric("record-e2e-latency", 
"%s-min", 

[GitHub] [kafka] badaiaqrandista opened a new pull request #9099: KAFKA-6733: Printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-07-29 Thread GitBox


badaiaqrandista opened a new pull request #9099:
URL: https://github.com/apache/kafka/pull/9099


   Implementation of KIP-431 - Support of printing additional ConsumerRecord 
fields in DefaultMessageFormatter
   
   
https://cwiki.apache.org/confluence/display/KAFKA/KIP-431%3A+Support+of+printing+additional+ConsumerRecord+fields+in+DefaultMessageFormatter
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness

2020-07-29 Thread GitBox


guozhangwang commented on pull request #9087:
URL: https://github.com/apache/kafka/pull/9087#issuecomment-665779451







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9012: KAFKA-10270: A broker to controller channel manager

2020-07-29 Thread GitBox


abbccdda commented on pull request #9012:
URL: https://github.com/apache/kafka/pull/9012#issuecomment-665757647







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mumrah commented on a change in pull request #9012: KAFKA-10270: A broker to controller channel manager

2020-07-29 Thread GitBox


mumrah commented on a change in pull request #9012:
URL: https://github.com/apache/kafka/pull/9012#discussion_r462352682



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -0,0 +1,188 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{LinkedBlockingDeque, TimeUnit}
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import kafka.utils.Logging
+import org.apache.kafka.clients._
+import org.apache.kafka.common.requests.AbstractRequest
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.security.JaasContext
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class manages the connection between a broker and the controller. It 
runs a single
+ * {@link BrokerToControllerRequestThread} which uses the broker's metadata 
cache as its own metadata to find
+ * and connect to the controller. The channel is async and runs the network 
connection in the background.
+ * The maximum number of in-flight requests are set to one to ensure orderly 
response from the controller, therefore
+ * care must be taken to not block on outstanding requests for too long.
+ */
+class BrokerToControllerChannelManager(metadataCache: 
kafka.server.MetadataCache,
+   time: Time,
+   metrics: Metrics,
+   config: KafkaConfig,
+   threadNamePrefix: Option[String] = 
None) extends Logging {
+  private val requestQueue = new 
LinkedBlockingDeque[BrokerToControllerQueueItem]
+  private val logContext = new 
LogContext(s"[broker-${config.brokerId}-to-controller] ")
+  private val manualMetadataUpdater = new ManualMetadataUpdater()
+  private val requestThread = newRequestThread
+
+  def start(): Unit = {
+requestThread.start()
+  }
+
+  def shutdown(): Unit = {
+requestThread.shutdown()
+requestThread.awaitShutdown()
+  }
+
+  private[server] def newRequestThread = {
+val brokerToControllerListenerName = 
config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
+val brokerToControllerSecurityProtocol = 
config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
+
+val networkClient = {
+  val channelBuilder = ChannelBuilders.clientChannelBuilder(
+brokerToControllerSecurityProtocol,
+JaasContext.Type.SERVER,
+config,
+brokerToControllerListenerName,
+config.saslMechanismInterBrokerProtocol,
+time,
+config.saslInterBrokerHandshakeRequestEnable,
+logContext
+  )
+  val selector = new Selector(
+NetworkReceive.UNLIMITED,
+Selector.NO_IDLE_TIMEOUT_MS,
+metrics,
+time,
+"BrokerToControllerChannel",
+Map("BrokerId" -> config.brokerId.toString).asJava,
+false,
+channelBuilder,
+logContext
+  )
+  new NetworkClient(
+selector,
+manualMetadataUpdater,
+config.brokerId.toString,
+1,
+0,
+0,
+Selectable.USE_DEFAULT_BUFFER_SIZE,
+Selectable.USE_DEFAULT_BUFFER_SIZE,
+config.requestTimeoutMs,
+config.connectionSetupTimeoutMs,
+config.connectionSetupTimeoutMaxMs,
+ClientDnsLookup.USE_ALL_DNS_IPS,
+time,
+false,
+new ApiVersions,
+logContext
+  )
+}
+val threadName = threadNamePrefix match {
+  case None => s"broker-${config.brokerId}-to-controller-send-thread"
+  case Some(name) => 
s"$name:broker-${config.brokerId}-to-controller-send-thread"
+}
+
+new BrokerToControllerRequestThread(networkClient, manualMetadataUpdater, 
requestQueue, metadataCache, config,
+  brokerToControllerListenerName, time, threadName)
+  }
+
+  private[server] def sendRequest(request: AbstractRequest.Builder[_ <: 

[GitHub] [kafka] mumrah commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-29 Thread GitBox


mumrah commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r462364719



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/FetchResponse.java
##
@@ -366,225 +255,128 @@ public FetchResponse(Errors error,
  LinkedHashMap> 
responseData,
  int throttleTimeMs,
  int sessionId) {
-this.error = error;
-this.responseData = responseData;
-this.throttleTimeMs = throttleTimeMs;
-this.sessionId = sessionId;
+this.data = toMessage(throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+this.responseDataMap = responseData;
 }
 
-public static FetchResponse parse(Struct struct) {
-LinkedHashMap> 
responseData = new LinkedHashMap<>();
-for (Object topicResponseObj : struct.getArray(RESPONSES_KEY_NAME)) {
-Struct topicResponse = (Struct) topicResponseObj;
-String topic = topicResponse.get(TOPIC_NAME);
-for (Object partitionResponseObj : 
topicResponse.getArray(PARTITIONS_KEY_NAME)) {
-Struct partitionResponse = (Struct) partitionResponseObj;
-Struct partitionResponseHeader = 
partitionResponse.getStruct(PARTITION_HEADER_KEY_NAME);
-int partition = partitionResponseHeader.get(PARTITION_ID);
-Errors error = 
Errors.forCode(partitionResponseHeader.get(ERROR_CODE));
-long highWatermark = 
partitionResponseHeader.get(HIGH_WATERMARK);
-long lastStableOffset = 
partitionResponseHeader.getOrElse(LAST_STABLE_OFFSET, 
INVALID_LAST_STABLE_OFFSET);
-long logStartOffset = 
partitionResponseHeader.getOrElse(LOG_START_OFFSET, INVALID_LOG_START_OFFSET);
-Optional preferredReadReplica = Optional.of(
-partitionResponseHeader.getOrElse(PREFERRED_READ_REPLICA, 
INVALID_PREFERRED_REPLICA_ID)
-
).filter(Predicate.isEqual(INVALID_PREFERRED_REPLICA_ID).negate());
-
-BaseRecords baseRecords = 
partitionResponse.getRecords(RECORD_SET_KEY_NAME);
-if (!(baseRecords instanceof MemoryRecords))
-throw new IllegalStateException("Unknown records type 
found: " + baseRecords.getClass());
-MemoryRecords records = (MemoryRecords) baseRecords;
-
-List abortedTransactions = null;
-if 
(partitionResponseHeader.hasField(ABORTED_TRANSACTIONS_KEY_NAME)) {
-Object[] abortedTransactionsArray = 
partitionResponseHeader.getArray(ABORTED_TRANSACTIONS_KEY_NAME);
-if (abortedTransactionsArray != null) {
-abortedTransactions = new 
ArrayList<>(abortedTransactionsArray.length);
-for (Object abortedTransactionObj : 
abortedTransactionsArray) {
-Struct abortedTransactionStruct = (Struct) 
abortedTransactionObj;
-long producerId = 
abortedTransactionStruct.get(PRODUCER_ID);
-long firstOffset = 
abortedTransactionStruct.get(FIRST_OFFSET);
-abortedTransactions.add(new 
AbortedTransaction(producerId, firstOffset));
-}
-}
-}
-
-PartitionData partitionData = new 
PartitionData<>(error, highWatermark, lastStableOffset,
-logStartOffset, preferredReadReplica, 
abortedTransactions, records);
-responseData.put(new TopicPartition(topic, partition), 
partitionData);
-}
-}
-return new FetchResponse<>(Errors.forCode(struct.getOrElse(ERROR_CODE, 
(short) 0)), responseData,
-struct.getOrElse(THROTTLE_TIME_MS, DEFAULT_THROTTLE_TIME), 
struct.getOrElse(SESSION_ID, INVALID_SESSION_ID));
+public FetchResponse(FetchResponseData fetchResponseData) {
+this.data = fetchResponseData;
+this.responseDataMap = toResponseDataMap(fetchResponseData);
 }
 
 @Override
 public Struct toStruct(short version) {
-return toStruct(version, throttleTimeMs, error, 
responseData.entrySet().iterator(), sessionId);
+return data.toStruct(version);
 }
 
 @Override
-protected Send toSend(String dest, ResponseHeader responseHeader, short 
apiVersion) {
-Struct responseHeaderStruct = responseHeader.toStruct();
-Struct responseBodyStruct = toStruct(apiVersion);
-
-// write the total size and the response header
-ByteBuffer buffer = ByteBuffer.allocate(responseHeaderStruct.sizeOf() 
+ 4);
-buffer.putInt(responseHeaderStruct.sizeOf() + 
responseBodyStruct.sizeOf());
-responseHeaderStruct.writeTo(buffer);
+public Send toSend(String dest, ResponseHeader responseHeader, short 
apiVersion) {
+// Generate the Sends for 

[GitHub] [kafka] chia7712 commented on pull request #9096: MINOR: Add comments to constrainedAssign and generalAssign method

2020-07-29 Thread GitBox


chia7712 commented on pull request #9096:
URL: https://github.com/apache/kafka/pull/9096#issuecomment-665504255


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] badaiaqrandista commented on pull request #4807: KAFKA-6733: Support of printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-07-29 Thread GitBox


badaiaqrandista commented on pull request #4807:
URL: https://github.com/apache/kafka/pull/4807#issuecomment-665660018


   Closing this PR as it has been superseded by PR 9099 
(https://github.com/apache/kafka/pull/9099).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soarez commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers

2020-07-29 Thread GitBox


soarez commented on pull request #9000:
URL: https://github.com/apache/kafka/pull/9000#issuecomment-665668720


   @mjsax what can we do to proceed?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gwenshap commented on pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log

2020-07-29 Thread GitBox


gwenshap commented on pull request #9054:
URL: https://github.com/apache/kafka/pull/9054#issuecomment-665773125


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Jason Gustafson (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167387#comment-17167387
 ] 

Jason Gustafson commented on KAFKA-10324:
-

[~twbecker] Thanks for the report. I'm trying to understand why the fetch is 
not including batches beyond the one with the last offset removed. Is that 
because the batch itself is already satisfying the fetch max bytes? It would be 
helpful to include a snippet from a dump of the log with the batch that is 
causing problems.

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167371#comment-17167371
 ] 

Tommy Becker commented on KAFKA-10324:
--

Thanks for the response [~ijuma]. Yes, these are Java consumers, and I agree 
it's odd that this has not been found before now. I have limited familiarity 
with the code base, so it's possible I'm missing something but I believe the 
issue is as I described. In my tests I'm trying to consume a 25GB topic and 
have found 2 distinct offsets which the consumer cannot advance beyond, and 
they are both cases where the offset:
 # Is the lastOffset in the last batch of its log segment
 # Does not actually exist, presumably due to log compaction.

 

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167369#comment-17167369
 ] 

Ismael Juma commented on KAFKA-10324:
-

Thanks for the report. Looks like this issue was introduced in 0.11.0.0, would 
you agree? Interesting that no-one reported it for so long. Out of curiosity, 
the old consumers are Java consumers or other languages?

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167357#comment-17167357
 ] 

Tommy Becker commented on KAFKA-10324:
--

We have some legacy applications whose consumer versions are not easily 
upgraded hitting this issue, and it's hard to diagnose since the consumers do 
not give a proper message (or indeed any message in the case of the 0.10.1.0 
consumer) and since it is dependent on the way messages are batched, which is 
opaque to clients.

> Pre-0.11 consumers can get stuck when messages are downconverted from V2 
> format
> ---
>
> Key: KAFKA-10324
> URL: https://issues.apache.org/jira/browse/KAFKA-10324
> Project: Kafka
>  Issue Type: Bug
>Reporter: Tommy Becker
>Priority: Major
>
> As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
> even if that offset gets removed due to log compaction. If a pre-0.11 
> consumer seeks to such an offset and issues a fetch, it will get an empty 
> batch, since offsets prior to the requested one are filtered out during 
> down-conversion. KAFKA-5443 added consumer-side logic to advance the fetch 
> offset in this case, but this leaves old consumers unable to consume these 
> topics.
> The exact behavior varies depending on consumer version. The 0.10.0.0 
> consumer throws RecordTooLargeException and dies, believing that the record 
> must not have been returned because it was too large. The 0.10.1.0 consumer 
> simply spins fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10324) Pre-0.11 consumers can get stuck when messages are downconverted from V2 format

2020-07-29 Thread Tommy Becker (Jira)
Tommy Becker created KAFKA-10324:


 Summary: Pre-0.11 consumers can get stuck when messages are 
downconverted from V2 format
 Key: KAFKA-10324
 URL: https://issues.apache.org/jira/browse/KAFKA-10324
 Project: Kafka
  Issue Type: Bug
Reporter: Tommy Becker


As noted in KAFKA-5443, The V2 message format preserves a batch's lastOffset 
even if that offset gets removed due to log compaction. If a pre-0.11 consumer 
seeks to such an offset and issues a fetch, it will get an empty batch, since 
offsets prior to the requested one are filtered out during down-conversion. 
KAFKA-5443 added consumer-side logic to advance the fetch offset in this case, 
but this leaves old consumers unable to consume these topics.

The exact behavior varies depending on consumer version. The 0.10.0.0 consumer 
throws RecordTooLargeException and dies, believing that the record must not 
have been returned because it was too large. The 0.10.1.0 consumer simply spins 
fetching the same empty batch over and over.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9273) Refactor AbstractJoinIntegrationTest and Sub-classes

2020-07-29 Thread Bill Bejeck (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167349#comment-17167349
 ] 

Bill Bejeck commented on KAFKA-9273:


Hi [~albert02lowis],

 

Thanks for your interest.  It looks like [~sujayopensource] has not started 
work on this ticket yet.  I'd give another day or so to respond, then if you 
don't hear anything back, feel free to pick this ticket up.  

I've taken the liberty of adding you to the contributors list, so you should be 
able to self-assign this ticket and any others in the future.

 

-Bill

> Refactor AbstractJoinIntegrationTest and Sub-classes
> 
>
> Key: KAFKA-9273
> URL: https://issues.apache.org/jira/browse/KAFKA-9273
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Sujay Hegde
>Priority: Major
>  Labels: newbie
>
> The  AbstractJoinIntegrationTest uses an embedded broker, but not all the 
> sub-classes require the use of an embedded broker anymore.  Additionally, 
> there are two test remaining that require an embedded broker, but they don't 
> perform joins, the are tests validating other conditions, so ideally those 
> tests should move into a separate test



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9273) Refactor AbstractJoinIntegrationTest and Sub-classes

2020-07-29 Thread Albert Lowis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167321#comment-17167321
 ] 

Albert Lowis commented on KAFKA-9273:
-

Hi [~bbejeck] , [~sujayopensource] I am a newbie looking to contribute,

Can I take up this task? Since I see that it has been sometime since the last 
activity

 

Thank you,

Albert

> Refactor AbstractJoinIntegrationTest and Sub-classes
> 
>
> Key: KAFKA-9273
> URL: https://issues.apache.org/jira/browse/KAFKA-9273
> Project: Kafka
>  Issue Type: Test
>  Components: streams
>Reporter: Bill Bejeck
>Assignee: Sujay Hegde
>Priority: Major
>  Labels: newbie
>
> The  AbstractJoinIntegrationTest uses an embedded broker, but not all the 
> sub-classes require the use of an embedded broker anymore.  Additionally, 
> there are two test remaining that require an embedded broker, but they don't 
> perform joins, the are tests validating other conditions, so ideally those 
> tests should move into a separate test



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10323) NullPointerException during rebalance

2020-07-29 Thread yazgoo (Jira)
yazgoo created KAFKA-10323:
--

 Summary: NullPointerException during rebalance
 Key: KAFKA-10323
 URL: https://issues.apache.org/jira/browse/KAFKA-10323
 Project: Kafka
  Issue Type: Bug
  Components: KafkaConnect
Affects Versions: 2.5.0
Reporter: yazgoo


*confluent platform version: 5.5.0-ccs*

connector used: s3

Connector stops after rebalancing:

ERROR [Worker clientId=connect-1, groupId=connect] Couldn't instantiate task 
 because it has an invalid task configuration. This task will not 
execute until reconfigured.

java.lang.NullPointerException
 at org.apache.kafka.connect.runtime.Worker.startTask(Worker.java:427)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.startTask(DistributedHerder.java:1147)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder.access$1600(DistributedHerder.java:126)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1162)
 at 
org.apache.kafka.connect.runtime.distributed.DistributedHerder$12.call(DistributedHerder.java:1158)
 at java.util.concurrent.FutureTask.run(FutureTask.java:266)
 at 
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
 at 
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
 at java.lang.Thread.run(Thread.java:748)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6733) Support of printing additional ConsumerRecord fields in DefaultMessageFormatter

2020-07-29 Thread Badai Aqrandista (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6733?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17167207#comment-17167207
 ] 

Badai Aqrandista commented on KAFKA-6733:
-

I did a bad merge on PR 8909. So I've closed it.

I created a new PR 9099 that contain the against the latest trunk. This is 
ready for review: https://github.com/apache/kafka/pull/9099

> Support of printing additional ConsumerRecord fields in 
> DefaultMessageFormatter
> ---
>
> Key: KAFKA-6733
> URL: https://issues.apache.org/jira/browse/KAFKA-6733
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Mateusz Zakarczemny
>Assignee: Badai Aqrandista
>Priority: Minor
>
> It would be useful to have possibility of printing headers, partition and 
> offset in ConsoleConsumer. Especially support of headers seems to be missing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10322) InMemoryWindowStore restore keys format incompatibility (lack of sequenceNumber in keys on topic)

2020-07-29 Thread Jira
Tomasz Bradło created KAFKA-10322:
-

 Summary: InMemoryWindowStore restore keys format incompatibility 
(lack of sequenceNumber in keys on topic)
 Key: KAFKA-10322
 URL: https://issues.apache.org/jira/browse/KAFKA-10322
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.5.0
 Environment: windows/linux
Reporter: Tomasz Bradło


I have regular groupBy stream configuration:
{code:java}
   
fun addStream(kStreamBuilder: StreamsBuilder) {
val storeSupplier = Stores.inMemoryWindowStore("count-store",
Duration.ofDays(10),
Duration.ofDays(1),
false)
val storeBuilder: StoreBuilder> = 
Stores
.windowStoreBuilder(storeSupplier, 
JsonSerde(CountableEvent::class.java), Serdes.Long())

kStreamBuilder
.stream("input-topic", Consumed.with(Serdes.String(), 
Serdes.String()))
.map {_, jsonRepresentation -> 
KeyValue(eventsCountingDeserializer.deserialize(jsonRepresentation), null)}
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofDays(1)))
.count(Materialized.with(JsonSerde(CountableEvent::class.java), 
Serdes.Long()))
.toStream()
.to("topic1-count")

val storeConsumed = 
Consumed.with(WindowedSerdes.TimeWindowedSerde(JsonSerde(CountableEvent::class.java),
 Duration.ofDays(1).toMillis()), Serdes.Long())
kStreamBuilder.addGlobalStore(storeBuilder, "topic1-count", 
storeConsumed, passThroughProcessorSupplier)
}{code}
While sending to "topic1-count", for serializing the key 
[TimeWindowedSerializer|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/kstream/TimeWindowedSerializer.java]
 is used which is using 
[WindowKeySchema.toBinary|https://github.com/apache/kafka/blob/b1aa1912f08765d9914e3d036deee6b71ea009dd/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L112]
 so the message key format is:
{code:java}
real_grouping_key + timestamp(8bytes){code}
 

Everything works. I can get correct values from state-store. But, in recovery 
scenario, when [GlobalStateManagerImpl 
|https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java#L317]enters
 offset < highWatermark loop then

[InMemoryWindowStore stateRestoreCallback 
|https://github.com/apache/kafka/blob/2.5/streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryWindowStore.java#L105]reads
 from "topic1-count" and fails to extract valid key and timestamp using 
[WindowKeySchema.extractStoreKeyBytes 
|https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L188]and
 [WindowKeySchema.extractStoreTimestamp. 
|https://github.com/apache/kafka/blob/1ff25663a8beb8d7a67e6b58bc7ee0020651b75a/streams/src/main/java/org/apache/kafka/streams/state/internals/WindowKeySchema.java#L201]It
 fails because it expects format:
{code:java}
real_grouping_key + timestamp(8bytes) + sequence_number(4bytes) {code}
How this is supposed to work in this case?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion

2020-07-29 Thread GitBox


junrao commented on pull request #8936:
URL: https://github.com/apache/kafka/pull/8936#issuecomment-665148070


   I think we agreed to change the logic to do the index sanity check on index 
opening.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8993: KAFKA-10173: Use SmokeTest for upgrade system tests (#8938)

2020-07-29 Thread GitBox


guozhangwang commented on pull request #8993:
URL: https://github.com/apache/kafka/pull/8993#issuecomment-665190912


   Sorry for the long delay @vvcephei , the PR lgtm!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #9094: KAFKA-10054: add TRACE-level e2e latency metrics

2020-07-29 Thread GitBox


ableegoldman commented on a change in pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#discussion_r461993688



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/AdminClientConfig.java
##
@@ -198,7 +198,7 @@
 .define(METRICS_RECORDING_LEVEL_CONFIG,
 Type.STRING,
 Sensor.RecordingLevel.INFO.toString(),
-
in(Sensor.RecordingLevel.INFO.toString(), 
Sensor.RecordingLevel.DEBUG.toString()),
+
in(Sensor.RecordingLevel.INFO.toString(), 
Sensor.RecordingLevel.DEBUG.toString(), Sensor.RecordingLevel.TRACE.toString()),

Review comment:
   It's kind of a bummer that we can't just add the new TRACE level for 
Streams only; we have to add it to all the clients that Streams passes its 
configs down to. We could check for the new TRACE level and strip it off before 
passing the configs on to the clients, but that just seems like asking for 
trouble.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/TaskMetrics.java
##
@@ -88,13 +92,6 @@ private TaskMetrics() {}
 private static final String NUM_BUFFERED_RECORDS_DESCRIPTION = "The count 
of buffered records that are polled " +
 "from consumer and not yet processed for this active task";
 
-private static final String RECORD_E2E_LATENCY = "record-e2e-latency";

Review comment:
   Moved the common descriptions to StreamsMetricsImpl

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -227,6 +234,14 @@ protected Bytes keyBytes(final K key) {
 return byteEntries;
 }
 
+private void maybeRecordE2ELatency() {
+if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {

Review comment:
   For KV stores, we just compare the current time with the current 
record's timestamp

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredSessionStore.java
##
@@ -248,4 +253,12 @@ public void close() {
 private Bytes keyBytes(final K key) {
 return Bytes.wrap(serdes.rawKey(key));
 }
+
+private void maybeRecordE2ELatency() {
+if (e2eLatencySensor.shouldRecord() && e2eLatencySensor.hasMetrics()) {

Review comment:
   For session and window stores, we also just compare the current time 
with the current record's timestamp when `put` is called. This can mean the e2e 
latency is measured several times on the same record, for example in a windowed 
aggregation.
   At first I thought that didn't make sense, but now I think it's actually 
exactly what we want. First of all, it means we can actually account for the 
latency between calls to `put` within a processor. For simple point inserts 
this might not be a huge increase on the scale of ms, but more complex 
processing may benefit from seeing this granularity of information. If they 
don't want it, well, that's why we introduced `TRACE`
   
   Second, while it might seem like we're over-weighting some records by 
measuring the e2e latency on them more than others, I'm starting to think this 
actually makes more sense than not: the big picture benefit/use case for the 
e2e latency metric is less "how long for this record to get sent downstream" 
and more "how long for this record to be reflected in the state store/IQ 
results". Given that, each record should be weighted by its actual proportion 
of the state store. You aren't querying individual records (in a window store), 
you're querying the windows themselves
   
   I toyed around with the idea of measuring the e2e latency relative to the 
window time, instead of the record timestamp, but ultimately couldn't find any 
sense in that. 
   Thoughts?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/MetricsIntegrationTest.java
##
@@ -668,6 +671,9 @@ private void checkKeyValueStoreMetrics(final String 
group0100To24,
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_CURRENT, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_AVG, 0);
 checkMetricByName(listMetricStore, SUPPRESSION_BUFFER_SIZE_MAX, 0);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_AVG, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MIN, 
expectedNumberofE2ELatencyMetrics);
+checkMetricByName(listMetricStore, RECORD_E2E_LATENCY_MAX, 
expectedNumberofE2ELatencyMetrics);

Review comment:
   Ok there's something I'm not understanding about this test and/or the 
built-in metrics version. For some reason, the KV-store metrics are 0 when 
`METRICS_0100_TO_24` is used, and 1 (as expected) when the latest version in 
used. I feel like this is wrong, and it should 

[GitHub] [kafka] pan3793 commented on pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null

2020-07-29 Thread GitBox


pan3793 commented on pull request #8575:
URL: https://github.com/apache/kafka/pull/8575#issuecomment-665405076


   Fixed the mail thread, do you have any comments on the discussion and KIP 
doc? @rhauch 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda opened a new pull request #9095: KAFKA-10321: fix infinite blocking for global stream thread startup

2020-07-29 Thread GitBox


abbccdda opened a new pull request #9095:
URL: https://github.com/apache/kafka/pull/9095


   In the unit test `shouldDieOnInvalidOffsetExceptionDuringStartup` for JDK 
11, we spotted a case where a global stream thread startup would stall if it 
fails immediately upon the first poll. The reason is that `start()` function 
only checks whether the thread is *not running*, as it needs to block until it 
finishes the initialization. However, if the thread transits to `DEAD` 
immediately, the `start()` call would block forever.
   
   Use the failed unit test to verify it works.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-29 Thread GitBox


mjsax commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r461302625



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
   Thanks for digging into it @vvcephei -- The question about `pollTimeout` 
is fair. I guess for default configs it might not matter much, as the default 
is 100ms IIRC.
   
   At the same time, we apply the same `pollTimeout` during regular processing, 
and as a matter of fact, for this case, a use might want to use a longer 
poll-timeout, as otherwise, the thread would just "busy wait" anyway (only the 
responsiveness for a shutdown of the app should be considered).
   
   Thus, it might actually make sense to exclude `pollTimeout` completely and 
only use `requestTimeout + taskTimeout`. Again, using `taskTimeout` in `poll()` 
reduces the responsiveness of a shutdown -- however, atm during bootstrapping 
we ignore a shutdown signal anyway, hence, for now we don't make the situation 
worse. I create a ticket to fix this: 
https://issues.apache.org/jira/browse/KAFKA-10317 and will add a comment for 
now.
   
   Short related note: actually using `requestTimeout` seems to be a 
conservative upper bound for poll(). A request could fail with a different 
error before `requestTimeout` hits and would be retried internally for this 
case -- if this happens, we might want to start the `taskTimeout` earlier. 
However, we don't have any means atm to detect this case. Thus, using 
`requestTimeout` is the best option we have right now (because triggering 
`taskTimeout` too early seems to be worse than triggering it too late). I 
created a ticket though that might allow us to improve the code later: 
https://issues.apache.org/jira/browse/KAFKA-10315





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively

2020-07-29 Thread GitBox


guozhangwang commented on pull request #9081:
URL: https://github.com/apache/kafka/pull/9081#issuecomment-664690277


   Any blocking APIs should be covered by the `max.block.ms` so I think this is 
rather a bug-fix.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #6592: KAFKA-8326: Introduce List Serde

2020-07-29 Thread GitBox


mjsax commented on pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#issuecomment-665304810


   @yeralin -- your PR is in my review queue. Not sure how quickly I will find 
time to have a look though atm -- maybe next week, but I can't promise.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9092: KAFKA-10163; Define `controller_mutation_rate` a Double instead of a Long

2020-07-29 Thread GitBox


rajinisivaram commented on pull request #9092:
URL: https://github.com/apache/kafka/pull/9092#issuecomment-664990134


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sasakitoa commented on a change in pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively

2020-07-29 Thread GitBox


sasakitoa commented on a change in pull request #9081:
URL: https://github.com/apache/kafka/pull/9081#discussion_r461264833



##
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##
@@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness {
 TestUtils.waitUntilTrue(() => 
offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot 
read committed offset")
   }
 
+  @Test(expected = classOf[TimeoutException])
+  def testSendOffsetsToTransactionTimeout(): Unit = {
+val producer = createTransactionalProducer("transactionProducer", 
maxBlockMs = 1000)
+producer.initTransactions()
+producer.beginTransaction()
+producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, 
"foo".getBytes, "bar".getBytes))
+
+for (i <- 0 until servers.size)
+  killBroker(i)
+
+val offsets = new mutable.HashMap[TopicPartition, 
OffsetAndMetadata]().asJava
+offsets.put(new TopicPartition(topic1, 0), new OffsetAndMetadata(0))
+try {
+  producer.sendOffsetsToTransaction(offsets, "test-group")

Review comment:
   Replaced from `mutable.HashMap` to Map

##
File path: 
clients/src/main/java/org/apache/kafka/clients/producer/KafkaProducer.java
##
@@ -687,7 +687,7 @@ public void sendOffsetsToTransaction(Map offs
 throwIfProducerClosed();
 TransactionalRequestResult result = 
transactionManager.sendOffsetsToTransaction(offsets, groupMetadata);
 sender.wakeup();
-result.await();
+result.await(maxBlockTimeMs, TimeUnit.MILLISECONDS);

Review comment:
   I wrote some description related to TimeoutException and 
InterruptedException

##
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##
@@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness {
 TestUtils.waitUntilTrue(() => 
offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot 
read committed offset")
   }
 
+  @Test(expected = classOf[TimeoutException])
+  def testSendOffsetsToTransactionTimeout(): Unit = {
+val producer = createTransactionalProducer("transactionProducer", 
maxBlockMs = 1000)
+producer.initTransactions()
+producer.beginTransaction()
+producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, 
"foo".getBytes, "bar".getBytes))
+
+for (i <- 0 until servers.size)
+  killBroker(i)
+
+val offsets = new mutable.HashMap[TopicPartition, 
OffsetAndMetadata]().asJava
+offsets.put(new TopicPartition(topic1, 0), new OffsetAndMetadata(0))
+try {
+  producer.sendOffsetsToTransaction(offsets, "test-group")

Review comment:
   Replaced from `mutable.HashMap` to `Map`

##
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##
@@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness {
 TestUtils.waitUntilTrue(() => 
offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot 
read committed offset")
   }
 
+  @Test(expected = classOf[TimeoutException])
+  def testSendOffsetsToTransactionTimeout(): Unit = {
+val producer = createTransactionalProducer("transactionProducer", 
maxBlockMs = 1000)
+producer.initTransactions()
+producer.beginTransaction()
+producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, 
"foo".getBytes, "bar".getBytes))
+
+for (i <- 0 until servers.size)

Review comment:
   Modified to use from `size` to `indices`, thanks.

##
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##
@@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness {
 TestUtils.waitUntilTrue(() => 
offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot 
read committed offset")
   }
 
+  @Test(expected = classOf[TimeoutException])
+  def testSendOffsetsToTransactionTimeout(): Unit = {
+val producer = createTransactionalProducer("transactionProducer", 
maxBlockMs = 1000)
+producer.initTransactions()
+producer.beginTransaction()
+producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, 
"foo".getBytes, "bar".getBytes))
+
+for (i <- 0 until servers.size)
+  killBroker(i)
+
+try {
+  producer.sendOffsetsToTransaction(Map(

Review comment:
   I added some timeout tests for initTransaction, commitTransction, 
abortTransaction using same base method.
   Is this implementation correct what you intended?

##
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##
@@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness {
 TestUtils.waitUntilTrue(() => 
offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot 
read committed offset")
   }
 
+  @Test(expected = classOf[TimeoutException])
+  def testSendOffsetsToTransactionTimeout(): Unit = {
+val producer = 

[GitHub] [kafka] yeralin commented on pull request #6592: KAFKA-8326: Introduce List Serde

2020-07-29 Thread GitBox


yeralin commented on pull request #6592:
URL: https://github.com/apache/kafka/pull/6592#issuecomment-665078937


   Any updates?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-29 Thread GitBox


vvcephei commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-665126186







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-29 Thread GitBox


abbccdda commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r462047068



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -274,30 +252,74 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+offset = retryUntilSuccessOrThrowOnTaskTimeout(
+() -> globalConsumer.position(topicPartition),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
-while (offset < highWatermark) {
-final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+while (offset < highWatermark) { // when we "fix" this loop 
(KAFKA-7380 / KAFKA-10317)
+ // we should update the `poll()` 
timeout below
+
+// we ignore `poll.ms` config during bootstrapping phase and
+// apply `request.timeout.ms` plus `task.timeout.ms` instead
+//
+// the reason is, that `poll.ms` might be too short to give a 
fetch request a fair chance
+// to actually complete and we don't want to start 
`task.timeout.ms` too early
+//
+// we also pass `task.timeout.ms` into `poll()` directly right 
now as it simplifies our own code:
+// if we don't pass it in, we would just track the timeout 
ourselves and call `poll()` again
+// in our own retry loop; by passing the timeout we can reuse 
the consumer's internal retry loop instead
+//
+// note that using `request.timeout.ms` provides a 
conservative upper bound for the timeout;
+// this implies that we might start `task.timeout.ms` 
"delayed" -- however, starting the timeout
+// delayed is preferable (as it's more robust) than starting 
it too early
+//
+// TODO https://issues.apache.org/jira/browse/KAFKA-10315
+//   -> do a more precise timeout handling if `poll` would 
throw an exception if a fetch request fails
+//  (instead of letting the consumer retry fetch requests 
silently)
+//
+// TODO https://issues.apache.org/jira/browse/KAFKA-10317 and
+//  https://issues.apache.org/jira/browse/KAFKA-7380
+//  -> don't pass in `task.timeout.ms` to stay responsive if 
`KafkaStreams#close` gets called
+final ConsumerRecords records = 
globalConsumer.poll(requestTimeoutPlusTaskTimeout);
+if (records.isEmpty()) {
+// this will always throw
+maybeUpdateDeadlineOrThrow(time.milliseconds());

Review comment:
   Could we just throw here?

##
File path: docs/streams/developer-guide/config-streams.html
##
@@ -326,13 +321,18 @@ bootstrap.serversstate.cleanup.delay.ms
 Low
 The amount of time in milliseconds to wait before 
deleting state when a partition has migrated.
-60 milliseconds
+60 milliseconds (10 minutes)
   
   state.dir
 High
 Directory location for state stores.
 /tmp/kafka-streams
   
+  task.timeout.ms
+Medium

Review comment:
   @mjsax could we do a screenshot to make sure it looks good on the 
web-page?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImplTest.java
##
@@ -671,19 +1211,21 @@ private void writeCorruptCheckpoint() throws IOException 
{
 }
 }
 
-private void 

[GitHub] [kafka] guozhangwang commented on pull request #9083: KAFKA-9450: Follow-up; Forbid process after closed [WIP]

2020-07-29 Thread GitBox


guozhangwang commented on pull request #9083:
URL: https://github.com/apache/kafka/pull/9083#issuecomment-664699699


   cc @ableegoldman @mjsax 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bob-barrett commented on a change in pull request #9054: KAFKA-10282: Remove Log metrics immediately when deleting log

2020-07-29 Thread GitBox


bob-barrett commented on a change in pull request #9054:
URL: https://github.com/apache/kafka/pull/9054#discussion_r461721737



##
File path: core/src/main/scala/kafka/log/LogManager.scala
##
@@ -932,6 +927,7 @@ class LogManager(logDirs: Seq[File],
 val logsToCheckpoint = logsInDir(logDir)
 checkpointRecoveryOffsetsAndCleanSnapshotsInDir(logDir, 
logsToCheckpoint, ArrayBuffer.empty)
 checkpointLogStartOffsetsInDir(logDir, logsToCheckpoint)
+sourceLog.removeLogMetrics()

Review comment:
   Good catch. Opened https://issues.apache.org/jira/browse/KAFKA-10320 to 
track it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9001: KAFKA-10028: Implement write path for feature versioning system (KIP-584)

2020-07-29 Thread GitBox


abbccdda commented on pull request #9001:
URL: https://github.com/apache/kafka/pull/9001#issuecomment-665189199







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9008: KAFKA-9629 Use generated protocol for Fetch API

2020-07-29 Thread GitBox


hachikuji commented on a change in pull request #9008:
URL: https://github.com/apache/kafka/pull/9008#discussion_r461173162



##
File path: clients/src/main/resources/common/message/FetchRequest.json
##
@@ -55,35 +55,35 @@
   "about": "The minimum bytes to accumulate in the response." },
 { "name": "MaxBytes", "type": "int32", "versions": "3+", "default": 
"0x7fff", "ignorable": true,
   "about": "The maximum bytes to fetch.  See KIP-74 for cases where this 
limit may not be honored." },
-{ "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": 
"0", "ignorable": false,
+{ "name": "IsolationLevel", "type": "int8", "versions": "4+", "default": 
"0", "ignorable": true,

Review comment:
   I guess the implicit expectation is that if the protocol does not 
support the `read_committed` isolation level, then it wouldn't have 
transactional data anyway, so reverting to `read_uncommitted` is safe. Can't 
find a fault with that.

##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/AbstractRequest.java
##
@@ -146,7 +147,7 @@ public static AbstractRequest parseRequest(ApiKeys apiKey, 
short apiVersion, Str
 case PRODUCE:
 return new ProduceRequest(struct, apiVersion);
 case FETCH:
-return new FetchRequest(struct, apiVersion);
+return new FetchRequest(new FetchRequestData(struct, 
apiVersion), apiVersion);

Review comment:
   nit: any reason not to stick with the same constructor convention as the 
other requests?

##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.common.protocol;
+
+import org.apache.kafka.common.network.ByteBufferSend;
+import org.apache.kafka.common.network.Send;
+import org.apache.kafka.common.record.BaseRecords;
+import org.apache.kafka.common.utils.ByteUtils;
+
+import java.io.DataOutput;
+import java.nio.ByteBuffer;
+import java.util.function.Consumer;
+
+/**
+ * Implementation of Writable which produces a sequence of {@link Send} 
objects. This allows for deferring the transfer
+ * of data from a record-set's file channel to the eventual socket channel.
+ *
+ * Excepting {@link #writeRecords(BaseRecords)}, calls to the write methods on 
this class will append to a byte array
+ * according to the format specified in {@link DataOutput}. When a call is 
made to writeRecords, any previously written
+ * bytes will be flushed as a new {@link ByteBufferSend} to the given Send 
consumer. After flushing the pending bytes,
+ * another Send is passed to the consumer which wraps the underlying 
record-set's transfer logic.
+ *
+ * For example,
+ *
+ * 
+ * recordsWritable.writeInt(10);
+ * recordsWritable.writeRecords(records1);
+ * recordsWritable.writeInt(20);
+ * recordsWritable.writeRecords(records2);
+ * recordsWritable.writeInt(30);
+ * recordsWritable.flush();
+ * 
+ *
+ * Will pass 5 Send objects to the consumer given in the constructor. Care 
must be taken by callers to flush any
+ * pending bytes at the end of the writing sequence to ensure everything is 
flushed to the consumer. This class is
+ * intended to be used with {@link 
org.apache.kafka.common.record.MultiRecordsSend}.
+ *
+ * @see org.apache.kafka.common.requests.FetchResponse
+ */
+public class RecordsWriter implements Writable {
+private final String dest;
+private final Consumer sendConsumer;
+private final ByteBuffer buffer;
+private int mark;
+
+public RecordsWriter(String dest, int totalSize, Consumer 
sendConsumer) {

Review comment:
   Could we rename `totalSize` so that it is clear that it does not cover 
the record sizes. Maybe `totalOverheadSize` or `totalNonRecordSize` or 
something like that.

##
File path: 
clients/src/main/java/org/apache/kafka/common/protocol/RecordsWriter.java
##
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information 

[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-29 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-665149860


   ```streams_standby_replica_test```  -> 
https://issues.apache.org/jira/browse/KAFKA-10287
   
   I will take a look at ```streams_broker_bounce_test```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #9092: KAFKA-10163; Define `controller_mutation_rate` a Double instead of a Long

2020-07-29 Thread GitBox


dajac opened a new pull request #9092:
URL: https://github.com/apache/kafka/pull/9092


   First tests have shown that `controller_mutation_rate` can be quite low 
(e.g. around 1) in clusters with multiple tenants. At the moment, the rate is 
defined as a Long which limits the possible low values. Using a Double seems 
more appropriate.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9066: KAFKA-10287: Skip unknown offsets when computing sum of changelog offsets

2020-07-29 Thread GitBox


chia7712 commented on pull request #9066:
URL: https://github.com/apache/kafka/pull/9066#issuecomment-664764918







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9096: Add comments to constrainedAssign and generalAssign method

2020-07-29 Thread GitBox


showuon commented on pull request #9096:
URL: https://github.com/apache/kafka/pull/9096#issuecomment-665472409


   @ableegoldman @vahidhashemian , please help review this PR to add comments 
to the assign methods. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cyrusv closed pull request #9093: Throw error on when keys not found in FileConfigProvider

2020-07-29 Thread GitBox


cyrusv closed pull request #9093:
URL: https://github.com/apache/kafka/pull/9093


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with shims

2020-07-29 Thread GitBox


vvcephei commented on a change in pull request #9004:
URL: https://github.com/apache/kafka/pull/9004#discussion_r461271276



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -880,40 +880,41 @@ public synchronized ProcessorTopology 
buildGlobalStateTopology() {
 return globalGroups;
 }
 
+@SuppressWarnings("unchecked")
 private ProcessorTopology build(final Set nodeGroup) {
 Objects.requireNonNull(applicationId, "topology has not completed 
optimization");
 
-final Map> processorMap = new 
LinkedHashMap<>();
-final Map> topicSourceMap = new HashMap<>();
-final Map> topicSinkMap = new HashMap<>();
+final Map> processorMap = new 
LinkedHashMap<>();
+final Map> topicSourceMap = new 
HashMap<>();
+final Map> topicSinkMap = new HashMap<>();
 final Map stateStoreMap = new LinkedHashMap<>();
 final Set repartitionTopics = new HashSet<>();
 
 // create processor nodes in a topological order ("nodeFactories" is 
already topologically sorted)
 // also make sure the state store map values following the insertion 
ordering
-for (final NodeFactory factory : nodeFactories.values()) {
+for (final NodeFactory factory : nodeFactories.values()) {
 if (nodeGroup == null || nodeGroup.contains(factory.name)) {
-final ProcessorNode node = factory.build();
+final ProcessorNode node = factory.build();
 processorMap.put(node.name(), node);
 
 if (factory instanceof ProcessorNodeFactory) {
 buildProcessorNode(processorMap,
stateStoreMap,
-   (ProcessorNodeFactory) factory,
-   node);
+   (ProcessorNodeFactory) 
factory,
+   (ProcessorNode) node);
 
 } else if (factory instanceof SourceNodeFactory) {
 buildSourceNode(topicSourceMap,
 repartitionTopics,
-(SourceNodeFactory) factory,
-(SourceNode) node);
+(SourceNodeFactory) factory,
+(SourceNode) node);
 
 } else if (factory instanceof SinkNodeFactory) {
 buildSinkNode(processorMap,
   topicSinkMap,
   repartitionTopics,
-  (SinkNodeFactory) factory,
-  (SinkNode) node);
+  (SinkNodeFactory) factory,
+  (SinkNode) 
node);

Review comment:
   They have subtly different meanings, which I'm not 100% clear on all the 
time. I'm not sure if I had to change this one, of if it was an accident. I'll 
give it a closer look.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorShim.java
##
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.internals;
+
+
+import org.apache.kafka.streams.processor.api.Processor;
+import org.apache.kafka.streams.processor.api.ProcessorContext;
+
+public final class ProcessorShim implements 
Processor {

Review comment:
   I think "adapter" is the standard design pattern name for this type of 
thing. Not sure why I thought "shim" was a good choice in the heat of the 
moment. Maybe because I'm kind of slipping these classes in the middle to make 
everything line up? I can change them to "adapter".

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -880,40 +880,41 @@ public synchronized ProcessorTopology 
buildGlobalStateTopology() {
 return globalGroups;
 }
 
+@SuppressWarnings("unchecked")
 private ProcessorTopology build(final Set nodeGroup) {
 

[GitHub] [kafka] mjsax commented on pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-29 Thread GitBox


mjsax commented on pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#issuecomment-664773350


   @vvcephei @abbccdda @guozhangwang @hachikuji -- I updated this PR according 
to your discussions. Needed to squash for rebasing to resolve conflicts.
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively

2020-07-29 Thread GitBox


abbccdda commented on a change in pull request #9081:
URL: https://github.com/apache/kafka/pull/9081#discussion_r461313377



##
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##
@@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness {
 TestUtils.waitUntilTrue(() => 
offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot 
read committed offset")
   }
 
+  @Test(expected = classOf[TimeoutException])
+  def testSendOffsetsToTransactionTimeout(): Unit = {
+val producer = createTransactionalProducer("transactionProducer", 
maxBlockMs = 1000)
+producer.initTransactions()
+producer.beginTransaction()
+producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, 
"foo".getBytes, "bar".getBytes))
+
+for (i <- 0 until servers.size)

Review comment:
   nit: could use servers.indices

##
File path: core/src/test/scala/integration/kafka/api/TransactionsTest.scala
##
@@ -406,6 +406,26 @@ class TransactionsTest extends KafkaServerTestHarness {
 TestUtils.waitUntilTrue(() => 
offsetAndMetadata.equals(consumer.committed(Set(tp).asJava).get(tp)), "cannot 
read committed offset")
   }
 
+  @Test(expected = classOf[TimeoutException])
+  def testSendOffsetsToTransactionTimeout(): Unit = {
+val producer = createTransactionalProducer("transactionProducer", 
maxBlockMs = 1000)
+producer.initTransactions()
+producer.beginTransaction()
+producer.send(new ProducerRecord[Array[Byte], Array[Byte]](topic1, 
"foo".getBytes, "bar".getBytes))
+
+for (i <- 0 until servers.size)
+  killBroker(i)
+
+try {
+  producer.sendOffsetsToTransaction(Map(

Review comment:
   Do we have unit test coverage for other transaction API max blocking as 
well? Do you mind adding them as separate tests and share the same module?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively

2020-07-29 Thread GitBox


abbccdda commented on pull request #9081:
URL: https://github.com/apache/kafka/pull/9081#issuecomment-665157437







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda edited a comment on pull request #9012: KAFKA-10270: A broker to controller channel manager

2020-07-29 Thread GitBox


abbccdda edited a comment on pull request #9012:
URL: https://github.com/apache/kafka/pull/9012#issuecomment-665410633


   Got 2/3 green, with one jenkins job terminated unexpectedly. 
https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/7602/console



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #9094: KAFKA-10054: add TRACE-level e2e latency metrics

2020-07-29 Thread GitBox


ableegoldman opened a new pull request #9094:
URL: https://github.com/apache/kafka/pull/9094


   Adds avg, min, and max e2e latency metrics at the new TRACE level. Also adds 
the missing `avg` task-level metric at the INFO level.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9012: KAFKA-10270: A broker to controller channel manager

2020-07-29 Thread GitBox


cmccabe commented on pull request #9012:
URL: https://github.com/apache/kafka/pull/9012#issuecomment-665288947







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with adapters

2020-07-29 Thread GitBox


abbccdda commented on a change in pull request #9004:
URL: https://github.com/apache/kafka/pull/9004#discussion_r462057744



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java
##
@@ -764,12 +764,12 @@ private void connectProcessorAndStateStore(final String 
processorName,
 
 if (!sourceTopics.isEmpty()) {
 stateStoreNameToSourceTopics.put(stateStoreName,
-Collections.unmodifiableSet(sourceTopics));
+ 
Collections.unmodifiableSet(sourceTopics));

Review comment:
   format looks weird, maybe just do 4 spaces

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalProcessorContextImpl.java
##
@@ -57,12 +57,12 @@ public StateStore getStateStore(final String name) {
 
 @SuppressWarnings("unchecked")
 @Override
-public  void forward(final K key, final V value) {
-final ProcessorNode previousNode = currentNode();
+public  void forward(final KIn key, final VIn value) {
+final ProcessorNode previousNode = currentNode();
 try {
-for (final ProcessorNode child : currentNode().children()) {
+for (final ProcessorNode child : 
currentNode().children()) {
 setCurrentNode(child);
-((ProcessorNode) child).process(key, value);
+((ProcessorNode) child).process(key, 
value); // FIXME

Review comment:
   Could we leave a more clear comment on what needs to be fixed?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/api/ProcessorContext.java
##
@@ -0,0 +1,240 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.processor.api;
+
+import org.apache.kafka.common.header.Headers;
+import org.apache.kafka.common.serialization.Serde;
+import org.apache.kafka.streams.StreamsMetrics;
+import org.apache.kafka.streams.errors.StreamsException;
+import org.apache.kafka.streams.processor.Cancellable;
+import org.apache.kafka.streams.processor.Processor;
+import org.apache.kafka.streams.processor.PunctuationType;
+import org.apache.kafka.streams.processor.Punctuator;
+import org.apache.kafka.streams.processor.StateRestoreCallback;
+import org.apache.kafka.streams.processor.StateStore;
+import org.apache.kafka.streams.processor.TaskId;
+import org.apache.kafka.streams.processor.TimestampExtractor;
+import org.apache.kafka.streams.processor.To;
+
+import java.io.File;
+import java.time.Duration;
+import java.util.Map;
+
+/**
+ * Processor context interface.
+ *
+ * @param  a bound on the types of keys that may be forwarded
+ * @param  a bound on the types of values that may be forwarded
+ */
+public interface ProcessorContext {
+
+/**
+ * Returns the application id.
+ *
+ * @return the application id
+ */
+String applicationId();
+
+/**
+ * Returns the task id.
+ *
+ * @return the task id
+ */
+TaskId taskId();
+
+/**
+ * Returns the default key serde.
+ *
+ * @return the key serializer
+ */
+Serde keySerde();
+
+/**
+ * Returns the default value serde.
+ *
+ * @return the value serializer
+ */
+Serde valueSerde();
+
+/**
+ * Returns the state directory for the partition.
+ *
+ * @return the state directory
+ */
+File stateDir();
+
+/**
+ * Returns Metrics instance.
+ *
+ * @return StreamsMetrics
+ */
+StreamsMetrics metrics();
+
+/**
+ * Registers and possibly restores the specified storage engine.
+ *
+ * @param store the storage engine
+ * @param stateRestoreCallback the restoration callback logic for 
log-backed state stores upon restart
+ *
+ * @throws IllegalStateException If store gets registered after 
initialized is already finished
+ * @throws StreamsException if the store's change log does not contain the 
partition
+ */
+void register(final StateStore store,
+  final StateRestoreCallback stateRestoreCallback);
+
+/**
+ * Get the state store given the store name.
+ *
+ * 

[GitHub] [kafka] abbccdda commented on pull request #9012: KAFKA-10270: A broker to controller channel manager

2020-07-29 Thread GitBox


abbccdda commented on pull request #9012:
URL: https://github.com/apache/kafka/pull/9012#issuecomment-665410633







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #9087: HOTFIX: Set session timeout and heartbeat interval to default to decrease flakiness

2020-07-29 Thread GitBox


cadonna commented on pull request #9087:
URL: https://github.com/apache/kafka/pull/9087#issuecomment-664621717


   @abbccdda Looking at this test class again, I even think that we could set 
session timeout and heartbeat interval to default for all tests. The main 
motivation to reduce them was to not need to change the application ID for each 
test (see https://github.com/apache/kafka/pull/8530#discussion_r413170584). 
However, in the meanwhile each test has its own application ID, so I guess it 
would be fine to use again the defaults here. Is this correct @guozhangwang ?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 edited a comment on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-07-29 Thread GitBox


chia7712 edited a comment on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-665149860


   ```streams_standby_replica_test```  -> 
https://issues.apache.org/jira/browse/KAFKA-10287
   
   I will take a look at ```streams_broker_bounce_test``` 
(https://issues.apache.org/jira/browse/KAFKA-10292)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-29 Thread GitBox


vvcephei commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r461143961



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -275,31 +259,70 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
   final RecordConverter recordConverter) {
 for (final TopicPartition topicPartition : topicPartitions) {
 globalConsumer.assign(Collections.singletonList(topicPartition));
+long offset;
 final Long checkpoint = checkpointFileCache.get(topicPartition);
 if (checkpoint != null) {
 globalConsumer.seek(topicPartition, checkpoint);
+offset = checkpoint;
 } else {
 
globalConsumer.seekToBeginning(Collections.singletonList(topicPartition));
+final AtomicLong position = new AtomicLong();
+retryUntilSuccessOrThrowOnTaskTimeout(
+() -> 
position.set(globalConsumer.position(topicPartition)),
+String.format(
+"Failed to get position for partition %s. The broker 
may be transiently unavailable at the moment.",
+topicPartition
+)
+);
+offset = position.get();
 }
 
-long offset = globalConsumer.position(topicPartition);
 final Long highWatermark = highWatermarks.get(topicPartition);
 final RecordBatchingStateRestoreCallback stateRestoreAdapter =
 StateRestoreCallbackAdapter.adapt(stateRestoreCallback);
 
 stateRestoreListener.onRestoreStart(topicPartition, storeName, 
offset, highWatermark);
 long restoreCount = 0L;
 
+long deadlineMs = NO_DEADLINE;
 while (offset < highWatermark) {
 try {
 final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+if (records.isEmpty()) {
+if (taskTimeoutMs == 0L) {
+deadlineMs = maybeUpdateDeadlineOrThrow(

Review comment:
   Ok, thanks @mjsax . I just traced though the consumer code again, and 
have finally been able to see what you already knew: that `request.timeout.ms` 
is indeed the correct amount of time to wait.
   
   Namely, we send a fetch here: 
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L1285
   
   Which calls through to client.send here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L263
   
   Which fills in the `request.timeout.ms` config value here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L106
   
   Which uses it to construct a ClientRequest here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerNetworkClient.java#L129-L130
   
   Which then gets used to create an InFlightRequest when it gets sent here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L1248
   
   Which is later used to detect expired requests here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java#L162
   
   Which is used to list nodes (brokers) for which there is an expired request 
here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/InFlightRequests.java#L179
   
   Which is then processed as a "disconnection" here: 
https://github.com/apache/kafka/blob/659ca8f089c6b1c3a53036a8f96de7f763971fd1/clients/src/main/java/org/apache/kafka/clients/NetworkClient.java#L803
   
   It also looks like the KafkaClient just does a tight-loop checking for a 
network response, so we don't really need any extra time to account for 
sampling errors. Also, it still seems like using the sum as the poll duration 
is just as good as using your retry logic, so I think the duration parameter is 
fine.
   
   My only remaining question, which maybe doesn't really matter one way or 
another, is whether `poll.ms` really belongs here or not. It seems like the 
desired semantics are accomplished by just waiting `request.timeout.ms` for the 
initial failure, and then an extra `task.timeout.ms` for any retries.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -292,21 +278,36 

[GitHub] [kafka] vvcephei commented on pull request #9004: KAFKA-10261: Introduce the KIP-478 apis with shims

2020-07-29 Thread GitBox


vvcephei commented on pull request #9004:
URL: https://github.com/apache/kafka/pull/9004#issuecomment-664740837


   Thanks for the review, @abbccdda ! I've addressed your feedback.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9092: KAFKA-10163; Define `controller_mutation_rate` a Double instead of a Long

2020-07-29 Thread GitBox


dajac commented on pull request #9092:
URL: https://github.com/apache/kafka/pull/9092#issuecomment-664996150


   @rajinisivaram I have updated an existing test to use 1.5 instead of 2 to 
verify this. 
https://github.com/apache/kafka/pull/9092/files#diff-296d7b93103356535b8b891f019e4651R113.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining

2020-07-29 Thread GitBox


showuon commented on pull request #9062:
URL: https://github.com/apache/kafka/pull/9062#issuecomment-664801932


   hi @feyman2016  @huxihx , could you help review this small PR? Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rgibaiev commented on pull request #8575: KAFKA-8713 KIP-581 Add accept.optional.null to solve optional null

2020-07-29 Thread GitBox


rgibaiev commented on pull request #8575:
URL: https://github.com/apache/kafka/pull/8575#issuecomment-665225469


   Any update?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9047: KAFKA-9274: Remove `retries` for global task

2020-07-29 Thread GitBox


hachikuji commented on a change in pull request #9047:
URL: https://github.com/apache/kafka/pull/9047#discussion_r461114945



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -292,21 +278,36 @@ private void restoreState(final StateRestoreCallback 
stateRestoreCallback,
 
 while (offset < highWatermark) {
 try {
-final ConsumerRecords records = 
globalConsumer.poll(pollTime);
+final ConsumerRecords records =
+
globalConsumer.poll(pollTimePlusRequestTimeoutPlusTaskTimeout);

Review comment:
   It does seem a bit weird here to add in the request timeout. Not sure I 
follow the reasoning behind that...





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] codelabor opened a new pull request #9090: toString pattern change like ProducerRecord.java

2020-07-29 Thread GitBox


codelabor opened a new pull request #9090:
URL: https://github.com/apache/kafka/pull/9090


   
   In most of 'toString()' methods,'=' is used instead of ' = ', so I followed 
the convention.
   Actually, if you print 'ProducerRecord' and 'ConsumerRecord', the pattern 
looks different and looks strange.
   -
   producerRecord: ProducerRecord(topic=, partition=null, headers= ...)
   consumerRecord: ConsumerRecord(topic = , partition = 0, ..., headers = 
...)
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #9089: KAFKA-10224: Update jersey license from CDDL to EPLv2

2020-07-29 Thread GitBox


rhauch commented on pull request #9089:
URL: https://github.com/apache/kafka/pull/9089#issuecomment-665168618







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9094: KAFKA-10054: KIP-613, add TRACE-level e2e latency metrics

2020-07-29 Thread GitBox


ableegoldman commented on pull request #9094:
URL: https://github.com/apache/kafka/pull/9094#issuecomment-665394762


   call for review @vvcephei @cadonna @guozhangwang 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on pull request #9051: KAFKA-10268: dynamic config like "--delete-config log.retention.ms" doesn't work

2020-07-29 Thread GitBox


rhauch commented on pull request #9051:
URL: https://github.com/apache/kafka/pull/9051#issuecomment-665124035


   This was also cherry-picked to `2.6`, but that branch has been frozen while 
we try to release AK 2.6.0. However, given that this is low-risk, I'll leave it 
on `2.6` and update 
[KAFKA-10268](https://issues.apache.org/jira/browse/KAFKA-10268) instead.
   
   @huxihx, next time please refrain from cherry-picking to frozen branches. 
Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #9012: KAFKA-10270: A broker to controller channel manager

2020-07-29 Thread GitBox


cmccabe commented on a change in pull request #9012:
URL: https://github.com/apache/kafka/pull/9012#discussion_r461884961



##
File path: 
core/src/main/scala/kafka/server/BrokerToControllerChannelManager.scala
##
@@ -0,0 +1,211 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.server
+
+import java.util.concurrent.{BlockingQueue, LinkedBlockingQueue, TimeUnit}
+
+import kafka.common.{InterBrokerSendThread, RequestAndCompletionHandler}
+import kafka.network.RequestChannel
+import kafka.utils.Logging
+import org.apache.kafka.clients._
+import org.apache.kafka.common.requests.AbstractRequest
+import org.apache.kafka.common.utils.{LogContext, Time}
+import org.apache.kafka.common.Node
+import org.apache.kafka.common.metrics.Metrics
+import org.apache.kafka.common.network._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.requests.AbstractRequest.NoOpRequestBuilder
+import org.apache.kafka.common.security.JaasContext
+
+import scala.collection.mutable
+import scala.jdk.CollectionConverters._
+
+/**
+ * This class manages the connection between a broker and the controller. It 
runs a single
+ * {@link BrokerToControllerRequestThread} which uses the broker's metadata 
cache as its own metadata to find
+ * and connect to the controller. The channel is async and runs the network 
connection in the background.
+ * The maximum number of in-flight requests are set to one to ensure orderly 
response from the controller, therefore
+ * care must be taken to not block on outstanding requests for too long.
+ */
+class BrokerToControllerChannelManager(metadataCache: 
kafka.server.MetadataCache,
+   time: Time,
+   metrics: Metrics,
+   config: KafkaConfig,
+   threadNamePrefix: Option[String] = 
None) extends Logging {
+  private val requestQueue = new 
LinkedBlockingQueue[BrokerToControllerQueueItem]
+  private val logContext = new 
LogContext(s"[broker-${config.brokerId}-to-controller] ")
+  private val manualMetadataUpdater = new ManualMetadataUpdater()
+  private val requestThread = newRequestThread
+
+  def start(): Unit = {
+requestThread.start()
+  }
+
+  def shutdown(): Unit = {
+requestThread.shutdown()
+requestThread.awaitShutdown()
+  }
+
+  private[server] def newRequestThread = {
+val brokerToControllerListenerName = 
config.controlPlaneListenerName.getOrElse(config.interBrokerListenerName)
+val brokerToControllerSecurityProtocol = 
config.controlPlaneSecurityProtocol.getOrElse(config.interBrokerSecurityProtocol)
+
+val networkClient = {
+  val channelBuilder = ChannelBuilders.clientChannelBuilder(
+brokerToControllerSecurityProtocol,
+JaasContext.Type.SERVER,
+config,
+brokerToControllerListenerName,
+config.saslMechanismInterBrokerProtocol,
+time,
+config.saslInterBrokerHandshakeRequestEnable,
+logContext
+  )
+  val selector = new Selector(
+NetworkReceive.UNLIMITED,
+Selector.NO_IDLE_TIMEOUT_MS,
+metrics,
+time,
+"BrokerToControllerChannel",
+Map("BrokerId" -> config.brokerId.toString).asJava,
+false,
+channelBuilder,
+logContext
+  )
+  new NetworkClient(
+selector,
+manualMetadataUpdater,
+config.brokerId.toString,
+1,
+0,
+0,
+Selectable.USE_DEFAULT_BUFFER_SIZE,
+Selectable.USE_DEFAULT_BUFFER_SIZE,
+config.requestTimeoutMs,
+config.connectionSetupTimeoutMs,
+config.connectionSetupTimeoutMaxMs,
+ClientDnsLookup.DEFAULT,

Review comment:
   @abbccdda : Please switch this to `ClientDnsLookup.USE_ALL_DNS_IPS` to 
be consistent with the other NetworkClients, such as the one in 
`ControllerChannelManager`, etc.

##
File path: core/src/main/scala/kafka/server/KafkaServer.scala
##
@@ -168,6 +168,8 @@ class KafkaServer(val config: KafkaConfig, time: Time = 
Time.SYSTEM, threadNameP
 
   var kafkaController: KafkaController = null
 
+  var 

[GitHub] [kafka] mjsax commented on pull request #9086: FIX: Remove staticmethod tag to be able to use logger of instance

2020-07-29 Thread GitBox


mjsax commented on pull request #9086:
URL: https://github.com/apache/kafka/pull/9086#issuecomment-664684265


   Merged to `trunk` and cherry-picked to `2.6`, `2.5`, `2.4`, and `2.3` 
branches.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx edited a comment on pull request #8984: KAFKA-10227: Enforce cleanup policy to only contain compact or delete once

2020-07-29 Thread GitBox


huxihx edited a comment on pull request #8984:
URL: https://github.com/apache/kafka/pull/8984#issuecomment-664804260


   Ping @omkreddy @mimaison @dajac  for review.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9091: MINOR; Make KafkaAdminClientTest.testDescribeLogDirsPartialFailure and KafkaAdminClientTest.testAlterReplicaLogDirsPartialFailure test more reli

2020-07-29 Thread GitBox


dajac commented on pull request #9091:
URL: https://github.com/apache/kafka/pull/9091#issuecomment-664848664


   @chia7712 Ack. I reverted `testMetadataRetries` back to the original version 
prior to #8864. That test is specific for the retries so it doesn't make sense 
to remove the retries config.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cyrusv opened a new pull request #9093: Throw error on when keys not found in FileConfigProvider

2020-07-29 Thread GitBox


cyrusv opened a new pull request #9093:
URL: https://github.com/apache/kafka/pull/9093


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] sasakitoa commented on pull request #9081: KAFKA-10309: KafkaProducer's sendOffsetsToTransaction should not block infinitively

2020-07-29 Thread GitBox


sasakitoa commented on pull request #9081:
URL: https://github.com/apache/kafka/pull/9081#issuecomment-664725165


   Thank you for kindness comments. I updated to improve java doc and related 
test code.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9007: KAFKA-10120: Deprecate DescribeLogDirsResult.all() and .values()

2020-07-29 Thread GitBox


tombentley commented on pull request #9007:
URL: https://github.com/apache/kafka/pull/9007#issuecomment-664836543


   @mimaison done, thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #9072: KAFKA-10162; Make the rate based quota behave more like a Token Bucket (KIP-599, Part III)

2020-07-29 Thread GitBox


dajac commented on pull request #9072:
URL: https://github.com/apache/kafka/pull/9072#issuecomment-664923691


   @apovzner Thanks for your comments. I have updated the PR to incorporate 
your feedback. I have also fixed a bug in unrecord and added the javadoc.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >