[jira] [Commented] (KAFKA-7223) KIP-328: Add in-memory Suppression
[ https://issues.apache.org/jira/browse/KAFKA-7223?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651091#comment-16651091 ] ASF GitHub Bot commented on KAFKA-7223: --- mjsax closed pull request #5787: KAFKA-7223: Suppression documentation URL: https://github.com/apache/kafka/pull/5787 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/ops.html b/docs/ops.html index d57f1cf20ff..158602b96a1 100644 --- a/docs/ops.html +++ b/docs/ops.html @@ -257,7 +257,7 @@ Managing C - Please note, that out of range offsets will be adjusted to available offset end. For example, if offset end is at 10 and offset shift request is + Please note, that out of range offsets will be adjusted to available offset end. For example, if offset end is at 10 and offset shift request is of 15, then, offset at 10 will actually be selected. @@ -1546,6 +1546,16 @@ The total number of commit calls. kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+) + +record-lateness-avg +The average observed lateness of records. + kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+) + + +record-lateness-max +The max observed lateness of records. + kafka.streams:type=stream-task-metrics,client-id=([-.\w]+),task-id=([-.\w]+) + diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html index 1622702c540..0ff28a8db69 100644 --- a/docs/streams/developer-guide/dsl-api.html +++ b/docs/streams/developer-guide/dsl-api.html @@ -58,6 +58,7 @@ Hopping time windows Sliding time windows Session Windows +Window Final Results @@ -65,6 +66,7 @@ Applying processors and transformers (Processor API integration) +Controlling KTable update rate Writing streams back to Kafka Testing a Streams application Kafka Streams DSL for Scala @@ -2969,6 +2971,73 @@ t=5 (blue), which lead to a merge of sessions and an extension of a session, respectively. + + Window Final Results + In Kafka Streams, windowed computations update their results continuously. + As new data arrives for a window, freshly computed results are emitted downstream. + For many applications, this is ideal, since fresh results are always available. + and Kafka Streams is designed to make programming continuous computations seamless. + However, some applications need to take action only on the final result of a windowed computation. + Common examples of this are sending alerts or delivering results to a system that doesn't support updates. + + Suppose that you have an hourly windowed count of events per user. + If you want to send an alert when a user has less than three events in an hour, you have a real challange. + All users would match this condition at first, until they accrue enough events, so you cannot simply + send an alert when someone matches the condition; you have to wait until you know you won't see any more events for a particular window + and then send the alert. + +Kafka Streams offers a clean way to define this logic: after defining your windowed computation, you can + suppress the intermediate results, emitting the final count for each user when the window is closed. + + For example: + + +KGroupedStreamUserId, Event grouped = ...; +grouped +.windowedBy(TimeWindows.of(Duration.ofHours(1)).grace(ofMinutes(10))) +.count() +.suppress(Suppressed.untilWindowCloses(unbounded())) +.filter((windowedUserId, count) - count 3) +.toStream() +.foreach((windowedUserId, count) - sendAlert(windowedUserId.window(), windowedUserId.key(), count)); + + + The key parts of this program are: +
[jira] [Commented] (KAFKA-7080) WindowStoreBuilder incorrectly initializes CachingWindowStore
[ https://issues.apache.org/jira/browse/KAFKA-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651087#comment-16651087 ] ASF GitHub Bot commented on KAFKA-7080: --- mjsax closed pull request #5804: KAFKA-7080 and KAFKA-7222: Cleanup overlapping KIP changes URL: https://github.com/apache/kafka/pull/5804 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index b26f3c339cf..c0f28efe98b 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -134,6 +134,18 @@ Streams API see https://cwiki.apache.org/confluence/display/KAFKA/KIP-324%3A+Add+method+to+get+metrics%28%29+in+AdminClient;>KIP-324 + +We deprecated the notion of segments in window stores as those are intended to be an implementation details. +Thus, method Windows#segments() and variable Windows#segments were deprecated. +If you implement custom windows, you should update your code accordingly. +Similarly, WindowBytesStoreSupplier#segments() was deprecated and replaced with WindowBytesStoreSupplier#segmentInterval(). +If you implement custom window store, you need to update your code accordingly. + Finally, Stores#persistentWindowStore(...) were deprecated and replaced with a new overload that does not allow to specify the number of segments any longer. +For more details, see https://cwiki.apache.org/confluence/display/KAFKA/KIP-319%3A+Replace+segments+with+segmentInterval+in+WindowBytesStoreSupplier;>KIP-319 +(note: https://cwiki.apache.org/confluence/display/KAFKA/KIP-328%3A+Ability+to+suppress+updates+for+KTables;>KIP-328 and + https://cwiki.apache.org/confluence/display/KAFKA/KIP-358%3A+Migrate+Streams+API+to+Duration+instead+of+long+ms+times;>KIP-358 'overlap' with KIP-319). + + Streams API changes in 2.0.0 In 2.0.0 we have added a few new APIs on the ReadOnlyWindowStore interface (for details please read Streams API changes below). diff --git a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java index 4dfba2306cb..feaee1e1336 100644 --- a/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java +++ b/streams/src/main/java/org/apache/kafka/streams/kstream/Windows.java @@ -83,21 +83,6 @@ public long maintainMs() { return maintainDurationMs; } -/** - * Return the segment interval in milliseconds. - * - * @return the segment interval - * @deprecated since 2.1. Instead, directly configure the segment interval in a store supplier and use {@link Materialized#as(WindowBytesStoreSupplier)}. - */ -@Deprecated -public long segmentInterval() { -// Pinned arbitrarily to a minimum of 60 seconds. Profiling may indicate a different value is more efficient. -final long minimumSegmentInterval = 60_000L; -// Scaled to the (possibly overridden) retention period -return Math.max(maintainMs() / (segments - 1), minimumSegmentInterval); -} - - /** * Set the number of segments to be used for rolling the window store. * This function is not exposed to users but can be called by developers that extend this class. diff --git a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java index 30e51403714..f7a182472be 100644 --- a/streams/src/main/java/org/apache/kafka/streams/state/Stores.java +++ b/streams/src/main/java/org/apache/kafka/streams/state/Stores.java @@ -16,7 +16,6 @@ */ package org.apache.kafka.streams.state; -import java.time.Duration; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; @@ -32,6 +31,7 @@ import org.apache.kafka.streams.state.internals.SessionStoreBuilder; import org.apache.kafka.streams.state.internals.WindowStoreBuilder; +import java.time.Duration; import java.util.Objects; /** @@ -155,7 +155,7 @@ public String metricsScope() { * careful to set it the same as the windowed keys you're actually storing. * @param retainDuplicates whether or not to retain duplicates. * @return an instance of {@link WindowBytesStoreSupplier} - * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, long, long, boolean, long)} instead + * @deprecated since 2.1 Use {@link Stores#persistentWindowStore(String, Duration, Duration, boolean)} instead */
[jira] [Commented] (KAFKA-6764) ConsoleConsumer behavior inconsistent when specifying --partition with --from-beginning
[ https://issues.apache.org/jira/browse/KAFKA-6764?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651075#comment-16651075 ] ASF GitHub Bot commented on KAFKA-6764: --- cmccabe closed pull request #5637: MINOR : Fixed KAFKA-6764; Update usage for console-consumer whitelist option URL: https://github.com/apache/kafka/pull/5637 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala index 365652a75b5..06705d59219 100755 --- a/core/src/main/scala/kafka/tools/ConsoleConsumer.scala +++ b/core/src/main/scala/kafka/tools/ConsoleConsumer.scala @@ -65,6 +65,7 @@ object ConsoleConsumer extends Logging { def run(conf: ConsumerConfig) { val timeoutMs = if (conf.timeoutMs >= 0) conf.timeoutMs else Long.MaxValue val consumer = new KafkaConsumer(consumerProps(conf), new ByteArrayDeserializer, new ByteArrayDeserializer) + val consumerWrapper = if (conf.partitionArg.isDefined) new ConsumerWrapper(Option(conf.topicArg), conf.partitionArg, Option(conf.offsetArg), None, consumer, timeoutMs) @@ -194,7 +195,7 @@ object ConsoleConsumer extends Logging { .withRequiredArg .describedAs("topic") .ofType(classOf[String]) -val whitelistOpt = parser.accepts("whitelist", "Whitelist of topics to include for consumption.") +val whitelistOpt = parser.accepts("whitelist", "Regular expression specifying whitelist of topics to include for consumption.") .withRequiredArg .describedAs("whitelist") .ofType(classOf[String]) @@ -355,7 +356,7 @@ object ConsoleConsumer extends Logging { val groupIdsProvided = Set( Option(options.valueOf(groupIdOpt)), // via --group Option(consumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)), // via --consumer-property - Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via --cosumer.config + Option(extraConsumerProps.get(ConsumerConfig.GROUP_ID_CONFIG)) // via --consumer.config ).flatten if (groupIdsProvided.size > 1) { @@ -376,6 +377,9 @@ object ConsoleConsumer extends Logging { groupIdPassed = false } +if (groupIdPassed && partitionArg.isDefined) + CommandLineUtils.printUsageAndDie(parser, "Options group and partition cannot be specified together.") + def tryParse(parser: OptionParser, args: Array[String]): OptionSet = { try parser.parse(args: _*) diff --git a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala index 47b7fae3d9b..cdc146f3666 100644 --- a/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala +++ b/core/src/test/scala/unit/kafka/tools/ConsoleConsumerTest.scala @@ -432,4 +432,67 @@ class ConsoleConsumerTest { assertTrue(formatter.keyDeserializer.get.asInstanceOf[MockDeserializer].isKey) } + @Test + def shouldParseGroupIdFromBeginningGivenTogether() { +// Start from earliest +var args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group", + "--from-beginning") + +var config = new ConsoleConsumer.ConsumerConfig(args) +assertEquals("localhost:9092", config.bootstrapServer) +assertEquals("test", config.topicArg) +assertEquals(-2, config.offsetArg) +assertEquals(true, config.fromBeginning) + +// Start from latest +args = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group" +) + +config = new ConsoleConsumer.ConsumerConfig(args) +assertEquals("localhost:9092", config.bootstrapServer) +assertEquals("test", config.topicArg) +assertEquals(-1, config.offsetArg) +assertEquals(false, config.fromBeginning) + } + + @Test(expected = classOf[IllegalArgumentException]) + def shouldExitOnGroupIdAndPartitionGivenTogether() { +Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) +//Given +val args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", + "--topic", "test", + "--group", "test-group", + "--partition", "0") + +//When +try { + new ConsoleConsumer.ConsumerConfig(args) +} finally { + Exit.resetExitProcedure() +} + } + + @Test(expected = classOf[IllegalArgumentException]) + def shouldExitOnOffsetWithoutPartition() { +Exit.setExitProcedure((_, message) => throw new IllegalArgumentException(message.orNull)) +//Given +val args: Array[String] = Array( + "--bootstrap-server", "localhost:9092", +
[jira] [Commented] (KAFKA-7396) KIP-365: Materialized, Serialized, Joined, Consumed and Produced with implicit Serde
[ https://issues.apache.org/jira/browse/KAFKA-7396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651023#comment-16651023 ] ASF GitHub Bot commented on KAFKA-7396: --- vvcephei closed pull request #5803: KAFKA-7396: document implicit Grouped URL: https://github.com/apache/kafka/pull/5803 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/streams/developer-guide/dsl-api.html b/docs/streams/developer-guide/dsl-api.html index 1622702c540..37b6947a4dc 100644 --- a/docs/streams/developer-guide/dsl-api.html +++ b/docs/streams/developer-guide/dsl-api.html @@ -3355,18 +3355,18 @@ } } - In the above code snippet, we don't have to provide any SerDes, Serialized, Produced, Consumed or Joined explicitly. They will also not be dependent on any SerDes specified in the config. In fact all SerDes specified in the config will be ignored by the Scala APIs. All SerDes and Serialized, Produced, Consumed or Joined will be handled through implicit SerDes as discussed later in the Implicit SerDes section. The complete independence from configuration based SerDes is what makes this library completely typesafe. Any missing instances of SerDes, Serialized, Produced, Consumed or Joined will be flagged as a compile time error. + In the above code snippet, we don't have to provide any SerDes, Grouped, Produced, Consumed or Joined explicitly. They will also not be dependent on any SerDes specified in the config. In fact all SerDes specified in the config will be ignored by the Scala APIs. All SerDes and Grouped, Produced, Consumed or Joined will be handled through implicit SerDes as discussed later in the Implicit SerDes section. The complete independence from configuration based SerDes is what makes this library completely typesafe. Any missing instances of SerDes, Grouped, Produced, Consumed or Joined will be flagged as a compile time error. Implicit SerDes - One of the common complaints of Scala users with the Java API has been the repetitive usage of the SerDes in API invocations. Many of the APIs need to take the SerDes through abstractions like Serialized, Produced, Consumed or Joined. And the user has to supply them every time through the with function of these classes. - The library uses the power of https://docs.scala-lang.org/tour/implicit-parameters.html;>Scala implicit parameters to alleviate this concern. As a user you can provide implicit SerDes or implicit values of Serialized, Produced, Consumed or Joined once and make your code less verbose. In fact you can just have the implicit SerDes in scope and the library will make the instances of Serialized, Produced, Consumed or Joined available in scope. + One of the common complaints of Scala users with the Java API has been the repetitive usage of the SerDes in API invocations. Many of the APIs need to take the SerDes through abstractions like Grouped, Produced, Consumed or Joined. And the user has to supply them every time through the with function of these classes. + The library uses the power of https://docs.scala-lang.org/tour/implicit-parameters.html;>Scala implicit parameters to alleviate this concern. As a user you can provide implicit SerDes or implicit values of Grouped, Produced, Consumed or Joined once and make your code less verbose. In fact you can just have the implicit SerDes in scope and the library will make the instances of Grouped, Produced, Consumed or Joined available in scope. The library also bundles all implicit SerDes of the commonly used primitive types in a Scala module - so just import the module vals and have all SerDes in scope. A similar strategy of modular implicits can be adopted for any user-defined SerDes as well (User-defined SerDes are discussed in the next section). Here's an example: // DefaultSerdes brings into scope implicit SerDes (mostly for primitives) -// that will set up all Serialized, Produced, Consumed and Joined instances. -// So all APIs below that accept Serialized, Produced, Consumed or Joined will +// that will set up all Grouped, Produced, Consumed and Joined instances. +// So all APIs below that accept Grouped, Produced, Consumed or Joined will // get these instances automatically import Serdes._ @@ -3376,7 +3376,7 @@ val userRegionsTable: KTable[String, String] = builder.table(userRegionsTopic) -// The following code fragment does not have a single instance of Serialized, +// The following code fragment does not have a single instance of Grouped, // Produced, Consumed or
[jira] [Commented] (KAFKA-7080) WindowStoreBuilder incorrectly initializes CachingWindowStore
[ https://issues.apache.org/jira/browse/KAFKA-7080?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16651008#comment-16651008 ] ASF GitHub Bot commented on KAFKA-7080: --- mjsax opened a new pull request #5804: KAFKA-7080 and KAFKA-7222: Cleanup overlapping KIP changes URL: https://github.com/apache/kafka/pull/5804 - KIP-319 and KIP-328 overlap and we can remove non-released deprecates methods - add upgrade docs for KIP-319 This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > WindowStoreBuilder incorrectly initializes CachingWindowStore > - > > Key: KAFKA-7080 > URL: https://issues.apache.org/jira/browse/KAFKA-7080 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.0.0, 1.0.1, 1.1.0, 2.0.0 >Reporter: John Roesler >Assignee: John Roesler >Priority: Major > Fix For: 2.1.0 > > > When caching is enabled on the WindowStoreBuilder, it creates a > CachingWindowStore. However, it incorrectly passes storeSupplier.segments() > (the number of segments) to the segmentInterval argument. > > The impact is low, since any valid number of segments is also a valid segment > size, but it likely results in much smaller segments than intended. For > example, the segments may be sized 3ms instead of 60,000ms. > > Ideally the WindowBytesStoreSupplier interface would allow suppliers to > advertise their segment size instead of segment count. I plan to create a KIP > to propose this. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7510) KStreams RecordCollectorImpl leaks data to logs on error
Mr Kafka created KAFKA-7510: --- Summary: KStreams RecordCollectorImpl leaks data to logs on error Key: KAFKA-7510 URL: https://issues.apache.org/jira/browse/KAFKA-7510 Project: Kafka Issue Type: Bug Components: streams Reporter: Mr Kafka org.apache.kafka.streams.processor.internals.RecordCollectorImpl leaks data on error as it dumps the *value* / message payload to the logs. This is problematic as it may contain personally identifiable information (pii) or other secret information to plain text log files which can then be propagated to other log systems i.e Splunk. I suggest the *key*, and *value* fields be moved to debug level as it is useful for some people while error level contains the *errorMessage, timestamp, topic* and *stackTrace*. {code:java} private void recordSendError( final K key, final V value, final Long timestamp, final String topic, final Exception exception ) { String errorLogMessage = LOG_MESSAGE; String errorMessage = EXCEPTION_MESSAGE; if (exception instanceof RetriableException) { errorLogMessage += PARAMETER_HINT; errorMessage += PARAMETER_HINT; } log.error(errorLogMessage, key, value, timestamp, topic, exception.toString()); sendException = new StreamsException( String.format( errorMessage, logPrefix, "an error caught", key, value, timestamp, topic, exception.toString() ), exception); }{code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7396) KIP-365: Materialized, Serialized, Joined, Consumed and Produced with implicit Serde
[ https://issues.apache.org/jira/browse/KAFKA-7396?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650897#comment-16650897 ] ASF GitHub Bot commented on KAFKA-7396: --- vvcephei opened a new pull request #5803: KAFKA-7396: document implicit Grouped URL: https://github.com/apache/kafka/pull/5803 Updates the KIP-365 documentation in light of the replacement of Serialized with Grouped. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > KIP-365: Materialized, Serialized, Joined, Consumed and Produced with > implicit Serde > > > Key: KAFKA-7396 > URL: https://issues.apache.org/jira/browse/KAFKA-7396 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Joan Goyeau >Assignee: Joan Goyeau >Priority: Major > Fix For: 2.1.0 > > > See KIP: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-365%3A+Materialized%2C+Serialized%2C+Joined%2C+Consumed+and+Produced+with+implicit+Serde] > Github PR: https://github.com/apache/kafka/pull/5551 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations
[ https://issues.apache.org/jira/browse/KAFKA-7509?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650896#comment-16650896 ] Randall Hauch commented on KAFKA-7509: -- Created a patch with a proposed approach to prevent these warning messages: https://github.com/apache/kafka/pull/5802 > Kafka Connect logs unnecessary warnings about unused configurations > --- > > Key: KAFKA-7509 > URL: https://issues.apache.org/jira/browse/KAFKA-7509 > Project: Kafka > Issue Type: Improvement > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Randall Hauch >Assignee: Randall Hauch >Priority: Major > > When running Connect, the logs contain quite a few warnings about "The > configuration '{}' was supplied but isn't a known config." This occurs when > Connect creates producers, consumers, and admin clients, because the > AbstractConfig is logging unused configuration properties upon construction. > It's complicated by the fact that the Producer, Consumer, and AdminClient all > create their own AbstractConfig instances within the constructor, so we can't > even call its {{ignore(String key)}} method. > See also KAFKA-6793 for a similar issue with Streams. > There are no arguments in the Producer, Consumer, or AdminClient constructors > to control whether the configs log these warnings, so a simpler workaround > is to only pass those configuration properties to the Producer, Consumer, and > AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig > configdefs know about. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7509) Kafka Connect logs unnecessary warnings about unused configurations
Randall Hauch created KAFKA-7509: Summary: Kafka Connect logs unnecessary warnings about unused configurations Key: KAFKA-7509 URL: https://issues.apache.org/jira/browse/KAFKA-7509 Project: Kafka Issue Type: Improvement Components: KafkaConnect Affects Versions: 0.10.2.0 Reporter: Randall Hauch Assignee: Randall Hauch When running Connect, the logs contain quite a few warnings about "The configuration '{}' was supplied but isn't a known config." This occurs when Connect creates producers, consumers, and admin clients, because the AbstractConfig is logging unused configuration properties upon construction. It's complicated by the fact that the Producer, Consumer, and AdminClient all create their own AbstractConfig instances within the constructor, so we can't even call its {{ignore(String key)}} method. See also KAFKA-6793 for a similar issue with Streams. There are no arguments in the Producer, Consumer, or AdminClient constructors to control whether the configs log these warnings, so a simpler workaround is to only pass those configuration properties to the Producer, Consumer, and AdminClient that the ProducerConfig, ConsumerConfig, and AdminClientConfig configdefs know about. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread
[ https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650874#comment-16650874 ] Allen Wang commented on KAFKA-7504: --- [~junrao] Does that mean the patch does not immediately address our issue and further work is needed? I was about to give it a try. > Broker performance degradation caused by call of sendfile reading disk in > network thread > > > Key: KAFKA-7504 > URL: https://issues.apache.org/jira/browse/KAFKA-7504 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 0.10.2.1 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura >Priority: Major > Labels: latency, performance > Attachments: image-2018-10-14-14-18-38-149.png, > image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, > image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, > image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, > image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, > image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png > > > h2. Environment > OS: CentOS6 > Kernel version: 2.6.32-XX > Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from > trunk (2.2.0-SNAPSHOT) > h2. Phenomenon > Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x > more than usual. > Normally 99th %ile is lower than 20ms, but when this issue occurs it marks > 50ms to 200ms. > At the same time we could see two more things in metrics: > 1. Disk read coincidence from the volume assigned to log.dirs. > 2. Raise in network threads utilization (by > `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`) > As we didn't see increase of requests in metrics, we suspected blocking in > event loop ran by network thread as the cause of raising network thread > utilization. > Reading through Kafka broker source code, we understand that the only disk > IO performed in network thread is reading log data through calling > sendfile(2) (via FileChannel#transferTo). > To probe that the calls of sendfile(2) are blocking network thread for some > moments, I ran following SystemTap script to inspect duration of sendfile > syscalls. > {code:java} > # Systemtap script to measure syscall duration > global s > global records > probe syscall.$1 { > s[tid()] = gettimeofday_us() > } > probe syscall.$1.return { > elapsed = gettimeofday_us() - s[tid()] > delete s[tid()] > records <<< elapsed > } > probe end { > print(@hist_log(records)) > }{code} > {code:java} > $ stap -v syscall-duration.stp sendfile > # value (us) > value | count > 0 | 0 > 1 |71 > 2 |@@@ 6171 >16 |@@@ 29472 >32 |@@@ 3418 > 2048 | 0 > ... > 8192 | 3{code} > As you can see there were some cases taking more than few milliseconds, > implies that it blocks network thread for that long and applying the same > latency for all other request/response processing. > h2. Hypothesis > Gathering the above observations, I made the following hypothesis. > Let's say network-thread-1 multiplexing 3 connections. > - producer-A > - follower-B (broker replica fetch) > - consumer-C > Broker receives requests from each of those clients, [Produce, FetchFollower, > FetchConsumer]. > They are processed well by request handler threads, and now the response > queue of the network-thread contains 3 responses in following order: > [FetchConsumer, Produce, FetchFollower]. > network-thread-1 takes 3 responses and processes them sequentially > ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]). > Ideally processing of these 3 responses completes in microseconds as in it > just copies ready responses into client socket's buffer with non-blocking > manner. > However, Kafka uses sendfile(2) for transferring log data to client sockets. > The target data might be in page cache, but old data which has written a bit > far ago and never read since then, are likely not. > If the target data isn't in page cache, kernel first needs to load the > target page into cache. This takes more than few milliseconds to tens of > milliseconds depending on disk hardware and current load being applied to it. > Linux kernel doesn't considers the moment loading data from disk into page > cache as "blocked", hence it awaits completion
[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread
[ https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650798#comment-16650798 ] Jun Rao commented on KAFKA-7504: The case that Allen described is when a follower is fetching both in-sync and out-of-sync partitions. If the fetching of the out-of-sync partition's data is taking long, it will delay the propagation of the in-sync partition's data, which can increase the producer latency. This can happen even when the replication quota is enabled. To improve this case, we can potentially extend the idea in Yuto's patch for handling replication fetch request. Basically, we initiate the prefetching of a partition in a background thread. The fetch response can be sent when the prefetching of all partitions' data are completed or the timeout is reached. In the case of timeout, we only include those partitions whose prefetching has completed. > Broker performance degradation caused by call of sendfile reading disk in > network thread > > > Key: KAFKA-7504 > URL: https://issues.apache.org/jira/browse/KAFKA-7504 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 0.10.2.1 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura >Priority: Major > Labels: latency, performance > Attachments: image-2018-10-14-14-18-38-149.png, > image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, > image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, > image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, > image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, > image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png > > > h2. Environment > OS: CentOS6 > Kernel version: 2.6.32-XX > Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from > trunk (2.2.0-SNAPSHOT) > h2. Phenomenon > Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x > more than usual. > Normally 99th %ile is lower than 20ms, but when this issue occurs it marks > 50ms to 200ms. > At the same time we could see two more things in metrics: > 1. Disk read coincidence from the volume assigned to log.dirs. > 2. Raise in network threads utilization (by > `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`) > As we didn't see increase of requests in metrics, we suspected blocking in > event loop ran by network thread as the cause of raising network thread > utilization. > Reading through Kafka broker source code, we understand that the only disk > IO performed in network thread is reading log data through calling > sendfile(2) (via FileChannel#transferTo). > To probe that the calls of sendfile(2) are blocking network thread for some > moments, I ran following SystemTap script to inspect duration of sendfile > syscalls. > {code:java} > # Systemtap script to measure syscall duration > global s > global records > probe syscall.$1 { > s[tid()] = gettimeofday_us() > } > probe syscall.$1.return { > elapsed = gettimeofday_us() - s[tid()] > delete s[tid()] > records <<< elapsed > } > probe end { > print(@hist_log(records)) > }{code} > {code:java} > $ stap -v syscall-duration.stp sendfile > # value (us) > value | count > 0 | 0 > 1 |71 > 2 |@@@ 6171 >16 |@@@ 29472 >32 |@@@ 3418 > 2048 | 0 > ... > 8192 | 3{code} > As you can see there were some cases taking more than few milliseconds, > implies that it blocks network thread for that long and applying the same > latency for all other request/response processing. > h2. Hypothesis > Gathering the above observations, I made the following hypothesis. > Let's say network-thread-1 multiplexing 3 connections. > - producer-A > - follower-B (broker replica fetch) > - consumer-C > Broker receives requests from each of those clients, [Produce, FetchFollower, > FetchConsumer]. > They are processed well by request handler threads, and now the response > queue of the network-thread contains 3 responses in following order: > [FetchConsumer, Produce, FetchFollower]. > network-thread-1 takes 3 responses and processes them sequentially > ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]). > Ideally processing of these 3 responses completes in microseconds as in it > just copies ready responses into client socket's buffer
[jira] [Created] (KAFKA-7508) Kafka broker anonymous disconnected from Zookeeper
Sathish Yanamala created KAFKA-7508: --- Summary: Kafka broker anonymous disconnected from Zookeeper Key: KAFKA-7508 URL: https://issues.apache.org/jira/browse/KAFKA-7508 Project: Kafka Issue Type: Task Components: config Reporter: Sathish Yanamala Hello Team, We are facing below Error , Kafka broker unable to connect Zookeeper , Please check and suggest is there any configuration changes required on Kafka Broker. ERROR: 2018-10-15 12:24:07,502 WARN kafka.network.Processor: Attempting to send response via channel for which there is no open connection, connection id - -:9093-- -:47542-25929 2018-10-15 12:24:09,428 INFO kafka.coordinator.group.GroupCoordinator: [GroupCoordinator 3]: Group KMOffsetCache-xxx with generation 9 is now empty (__consumer_offsets-22) 2018-10-15 12:24:09,428 INFO kafka.server.epoch.LeaderEpochFileCache: Updated PartitionLeaderEpoch. New: \{epoch:1262, offset:151}, Current: \{epoch:1261, offset144} for Partition: __consumer_offsets-22. Cache now contains 15 entries. {color:#d04437}*2018-10-15 12:24:10,905 ERROR kafka.utils.KafkaScheduler: Uncaught exception in scheduled task 'highwatermark-checkpoint'*{color} {color:#d04437}*java.lang.OutOfMemoryError: Java heap space*{color} {color:#d04437} at{color} scala.collection.convert.DecorateAsScala$$Lambda$214/x.get$Lambda(Unknown Source) at java.lang.invoke.LambdaForm$DMH/xxx.invokeStatic_LL_L(LambdaForm$DMH) at java.lang.invoke.LambdaForm$MH/xx.linkToTargetMethod(LambdaForm$MH) at scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter(DecorateAsScala.scala:45) at scala.collection.convert.DecorateAsScala.collectionAsScalaIterableConverter$(DecorateAsScala.scala:44) at scala.collection.JavaConverters$.collectionAsScalaIterableConverter(JavaConverters.scala:73) at kafka.utils.Pool.values(Pool.scala:85) at kafka.server.ReplicaManager.nonOfflinePartitionsIterator(ReplicaManager.scala:397) at kafka.server.ReplicaManager.checkpointHighWatermarks(ReplicaManager.scala:1340) at kafka.server.ReplicaManager.$anonfun$startHighWaterMarksCheckPointThread$1(ReplicaManager.scala:253) at kafka.server.ReplicaManager$$Lambda$608/xx.apply$mcV$sp(Unknown Source) at kafka.utils.KafkaScheduler.$anonfun$schedule$2(KafkaScheduler.scala:110) at kafka.utils.KafkaScheduler$$Lambda$269/.apply$mcV$sp(Unknown Source) at kafka.utils.CoreUtils$$anon$1.run(CoreUtils.scala:61) at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180) at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) Thank you, Sathish Yanamala M:832-382-4487 -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7506) KafkaStreams repartition topic settings not suitable for processing old records
[ https://issues.apache.org/jira/browse/KAFKA-7506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650532#comment-16650532 ] Matthias J. Sax commented on KAFKA-7506: [~niklas.lonn] If you want a back-port, pleas comment on the other ticket. However, it's not a bug fix (the other ticket is marked as "improvement"), but a default config change and thus should not be part of a bug fix release IMHO. Note that you don't really need the bug fix though. You can reconfigure the repartition topics manually after Streams created them via `bin/kafka-topic.sh` command. As an alternative, you can also pass in topic-level configs that overwrite default config via StreamsConfig (note, that those configs apply to all internal topic; thus, you need to double check if you can use this or not): https://cwiki.apache.org/confluence/display/KAFKA/KIP-173%3A+Add+prefix+to+StreamsConfig+to+enable+setting+default+internal+topic+configs > KafkaStreams repartition topic settings not suitable for processing old > records > --- > > Key: KAFKA-7506 > URL: https://issues.apache.org/jira/browse/KAFKA-7506 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0, 1.1.1, 2.0.0 >Reporter: Niklas Lönn >Priority: Major > Attachments: kafka-7506.patch > > > Hi, We are using Kafka Streams to process a compacted store, when resetting > the application/processing from scratch the default topic configuration for > repartition topics is 50MB and 10min segment sizes. > > As the retention.ms is undefined, this leads to default retention.ms and log > cleaner starts competing with the application, effectively causing the > streams app to skip records. > {{Application logs the following:}} > {{Fetch offset 213792 is out of range for partition > app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7, resetting > offset}} > {{Fetch offset 110227 is out of range for partition > app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2, resetting > offset}} > {{Resetting offset for partition > app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7 to offset > 233302.}} > {{Resetting offset for partition > app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2 to offset > 119914.}} > By adding the following configuration to RepartitionTopicConfig.java the > issue is solved > {{tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // > Infinite}} > > My understanding is that this should be safe as KafkaStreams uses the admin > API to delete segments. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7504) Broker performance degradation caused by call of sendfile reading disk in network thread
[ https://issues.apache.org/jira/browse/KAFKA-7504?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650486#comment-16650486 ] Allen Wang commented on KAFKA-7504: --- Great work. We have observed significant produce response time increase when a follower is catching up with the leader after the follower is down for a while. The leader has a lot of disk read at the time. It seems to be related to this issue. I am looking forward to the patch in 1.1 and 2.x branches. > Broker performance degradation caused by call of sendfile reading disk in > network thread > > > Key: KAFKA-7504 > URL: https://issues.apache.org/jira/browse/KAFKA-7504 > Project: Kafka > Issue Type: Improvement > Components: core >Affects Versions: 0.10.2.1 >Reporter: Yuto Kawamura >Assignee: Yuto Kawamura >Priority: Major > Labels: latency, performance > Attachments: image-2018-10-14-14-18-38-149.png, > image-2018-10-14-14-18-57-429.png, image-2018-10-14-14-19-17-395.png, > image-2018-10-14-14-19-27-059.png, image-2018-10-14-14-19-41-397.png, > image-2018-10-14-14-19-51-823.png, image-2018-10-14-14-20-09-822.png, > image-2018-10-14-14-20-19-217.png, image-2018-10-14-14-20-33-500.png, > image-2018-10-14-14-20-46-566.png, image-2018-10-14-14-20-57-233.png > > > h2. Environment > OS: CentOS6 > Kernel version: 2.6.32-XX > Kafka version: 0.10.2.1, 0.11.1.2 (but reproduces with latest build from > trunk (2.2.0-SNAPSHOT) > h2. Phenomenon > Response time of Produce request (99th ~ 99.9th %ile) degrading to 50x ~ 100x > more than usual. > Normally 99th %ile is lower than 20ms, but when this issue occurs it marks > 50ms to 200ms. > At the same time we could see two more things in metrics: > 1. Disk read coincidence from the volume assigned to log.dirs. > 2. Raise in network threads utilization (by > `kafka.network:type=SocketServer,name=NetworkProcessorAvgIdlePercent`) > As we didn't see increase of requests in metrics, we suspected blocking in > event loop ran by network thread as the cause of raising network thread > utilization. > Reading through Kafka broker source code, we understand that the only disk > IO performed in network thread is reading log data through calling > sendfile(2) (via FileChannel#transferTo). > To probe that the calls of sendfile(2) are blocking network thread for some > moments, I ran following SystemTap script to inspect duration of sendfile > syscalls. > {code:java} > # Systemtap script to measure syscall duration > global s > global records > probe syscall.$1 { > s[tid()] = gettimeofday_us() > } > probe syscall.$1.return { > elapsed = gettimeofday_us() - s[tid()] > delete s[tid()] > records <<< elapsed > } > probe end { > print(@hist_log(records)) > }{code} > {code:java} > $ stap -v syscall-duration.stp sendfile > # value (us) > value | count > 0 | 0 > 1 |71 > 2 |@@@ 6171 >16 |@@@ 29472 >32 |@@@ 3418 > 2048 | 0 > ... > 8192 | 3{code} > As you can see there were some cases taking more than few milliseconds, > implies that it blocks network thread for that long and applying the same > latency for all other request/response processing. > h2. Hypothesis > Gathering the above observations, I made the following hypothesis. > Let's say network-thread-1 multiplexing 3 connections. > - producer-A > - follower-B (broker replica fetch) > - consumer-C > Broker receives requests from each of those clients, [Produce, FetchFollower, > FetchConsumer]. > They are processed well by request handler threads, and now the response > queue of the network-thread contains 3 responses in following order: > [FetchConsumer, Produce, FetchFollower]. > network-thread-1 takes 3 responses and processes them sequentially > ([https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/network/SocketServer.scala#L632]). > Ideally processing of these 3 responses completes in microseconds as in it > just copies ready responses into client socket's buffer with non-blocking > manner. > However, Kafka uses sendfile(2) for transferring log data to client sockets. > The target data might be in page cache, but old data which has written a bit > far ago and never read since then, are likely not. > If the target data isn't in page cache, kernel first needs to load the > target page into cache. This takes more than few milliseconds to tens of > milliseconds depending on disk
[jira] [Commented] (KAFKA-7505) Flaky test: SslTransportLayerTest.testListenerConfigOverride
[ https://issues.apache.org/jira/browse/KAFKA-7505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650322#comment-16650322 ] ASF GitHub Bot commented on KAFKA-7505: --- rajinisivaram opened a new pull request #5800: KAFKA-7505: Process incoming bytes on write error to report SSL failures URL: https://github.com/apache/kafka/pull/5800 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Flaky test: SslTransportLayerTest.testListenerConfigOverride > > > Key: KAFKA-7505 > URL: https://issues.apache.org/jira/browse/KAFKA-7505 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Assignee: Rajini Sivaram >Priority: Major > > This test seems to fail quite a bit recently. I've seen it happen with Java > 11 quite a bit so it could be more likely to fail there. > {code:java} > java.lang.AssertionError: expected: but > was: at org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.failNotEquals(Assert.java:834) at > org.junit.Assert.assertEquals(Assert.java:118) at > org.junit.Assert.assertEquals(Assert.java:144) at > org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:104) > at > org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:109) > at > org.apache.kafka.common.network.SslTransportLayerTest.testListenerConfigOverride(SslTransportLayerTest.java:319){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7507) Kafka Topics Error : "NotLeaderForPartitionException: This server is not the leader for that topic-partition"
Sathish Yanamala created KAFKA-7507: --- Summary: Kafka Topics Error : "NotLeaderForPartitionException: This server is not the leader for that topic-partition" Key: KAFKA-7507 URL: https://issues.apache.org/jira/browse/KAFKA-7507 Project: Kafka Issue Type: Bug Reporter: Sathish Yanamala Hello Team, We are facing below Error on existing application , We have this error first time in our application. Please suggest , Is there any configuration changes required on below issue, I just reviewed some of JIRA story’s related to below. We running our Environment with 5 Brokers and each topic we are following as replication -3 and partitions – 6 forall our topics and we have Zookeeper Environment . +*Error Log :*+ 2018-10-15 03:42:16,596 ERROR kafka.server.ReplicaFetcherThread: [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition __consumer_offsets-41 to broker 1:{color:#d04437}org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition{color}. 2018-10-15 03:42:16,606 ERROR kafka.server.ReplicaFetcherThread: [ReplicaFetcher replicaId=2, leaderId=4, fetcherId=0] Error for partition CSL_SUR_DL_LXNX-3 to broker 4:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2018-10-15 03:42:16,608 ERROR kafka.server.ReplicaFetcherThread: [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition CSL_SUR_DL_SOC_EAS-4 to broker 1:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2018-10-15 03:42:16,609 ERROR kafka.server.ReplicaFetcherThread: [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error for partition __consumer_offsets-26 to broker 3:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2018-10-15 03:42:16,613 ERROR kafka.server.ReplicaFetcherThread: [ReplicaFetcher replicaId=2, leaderId=5, fetcherId=0] Error for partition EDL_Datashare_Genesys_Request-3 to broker 5:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2018-10-15 03:42:16,613 ERROR kafka.server.ReplicaFetcherThread: [ReplicaFetcher replicaId=2, leaderId=5, fetcherId=0] Error for partition CSL_TRANSMIT_Data-3 to broker 5:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2018-10-15 03:42:16,613 ERROR kafka.server.ReplicaFetcherThread: [ReplicaFetcher replicaId=2, leaderId=4, fetcherId=0] Error for partition CHR_ResOrch_03-0 to broker 4:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2018-10-15 03:42:16,616 ERROR kafka.server.ReplicaFetcherThread: [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition CSL_SUR_LXNX_REQUEST-4 to broker 1:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2018-10-15 03:42:16,616 ERROR kafka.server.ReplicaFetcherThread: [ReplicaFetcher replicaId=2, leaderId=4, fetcherId=0] Error for partition EDL_Datashare_JCP_GPAs-4 to broker 4:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2018-10-15 03:42:16,617 ERROR kafka.server.ReplicaFetcherThread: [ReplicaFetcher replicaId=2, leaderId=5, fetcherId=0] Error for partition CHR_ResOrch_03-5 to broker 5:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2018-10-15 03:42:16,619 ERROR kafka.server.ReplicaFetcherThread: [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error for partition CHR_ResOrch-0 to broker 3:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2018-10-15 03:42:16,621 ERROR kafka.server.ReplicaFetcherThread: [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition __consumer_offsets-21 to broker 1:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2018-10-15 03:42:16,623 ERROR kafka.server.ReplicaFetcherThread: [ReplicaFetcher replicaId=2, leaderId=3, fetcherId=0] Error for partition CHR_ResOrch_05-3 to broker 3:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that topic-partition. 2018-10-15 03:42:16,624 ERROR kafka.server.ReplicaFetcherThread: [ReplicaFetcher replicaId=2, leaderId=1, fetcherId=0] Error for partition CHR_DataCompOrch-5 to broker 1:org.apache.kafka.common.errors.NotLeaderForPartitionException: This server is not the leader for that
[jira] [Commented] (KAFKA-7498) common.requests.CreatePartitionsRequest uses clients.admin.NewPartitions
[ https://issues.apache.org/jira/browse/KAFKA-7498?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650131#comment-16650131 ] ASF GitHub Bot commented on KAFKA-7498: --- rajinisivaram closed pull request #5784: KAFKA-7498: Remove references from `common.requests` to `clients` URL: https://github.com/apache/kafka/pull/5784 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 7810a3e8673..91d23f6424b 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -131,7 +131,6 @@ - diff --git a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java index 86e14476625..b4132840e4d 100644 --- a/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java +++ b/clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java @@ -81,6 +81,7 @@ import org.apache.kafka.common.requests.CreateDelegationTokenRequest; import org.apache.kafka.common.requests.CreateDelegationTokenResponse; import org.apache.kafka.common.requests.CreatePartitionsRequest; +import org.apache.kafka.common.requests.CreatePartitionsRequest.PartitionDetails; import org.apache.kafka.common.requests.CreatePartitionsResponse; import org.apache.kafka.common.requests.CreateTopicsRequest; import org.apache.kafka.common.requests.CreateTopicsResponse; @@ -142,6 +143,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.function.Predicate; +import java.util.stream.Collectors; import static org.apache.kafka.common.utils.Utils.closeQuietly; @@ -2083,7 +2085,8 @@ public CreatePartitionsResult createPartitions(Map newPar for (String topic : newPartitions.keySet()) { futures.put(topic, new KafkaFutureImpl<>()); } -final Map requestMap = new HashMap<>(newPartitions); +final Map requestMap = newPartitions.entrySet().stream() +.collect(Collectors.toMap(Map.Entry::getKey, e -> partitionDetails(e.getValue(; final long now = time.milliseconds(); runnable.call(new Call("createPartitions", calcDeadlineMs(now, options.timeoutMs()), @@ -2482,6 +2485,10 @@ private boolean handleFindCoordinatorError(FindCoordinatorResponse response, Kaf return false; } +private PartitionDetails partitionDetails(NewPartitions newPartitions) { +return new PartitionDetails(newPartitions.totalCount(), newPartitions.assignments()); +} + private final static class ListConsumerGroupsResults { private final List errors; private final HashMap listings; diff --git a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java index 795a66a9ea6..7872cf9f8d0 100644 --- a/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java +++ b/clients/src/main/java/org/apache/kafka/common/requests/CreatePartitionsRequest.java @@ -17,7 +17,6 @@ package org.apache.kafka.common.requests; -import org.apache.kafka.clients.admin.NewPartitions; import org.apache.kafka.common.protocol.ApiKeys; import org.apache.kafka.common.protocol.types.ArrayOf; import org.apache.kafka.common.protocol.types.Field; @@ -72,17 +71,47 @@ // It is an error for duplicate topics to be present in the request, // so track duplicates here to allow KafkaApis to report per-topic errors. private final Set duplicates; -private final Map newPartitions; +private final Map newPartitions; private final int timeout; private final boolean validateOnly; +public static class PartitionDetails { + +private final int totalCount; + +private final List> newAssignments; + +public PartitionDetails(int totalCount) { +this(totalCount, null); +} + +public PartitionDetails(int totalCount, List> newAssignments) { +this.totalCount = totalCount; +this.newAssignments = newAssignments; +} + +public int totalCount() { +return totalCount; +} + +public List> newAssignments() { +return newAssignments; +} + +@Override +public String toString() { +return "(totalCount=" + totalCount() + ", newAssignments=" + newAssignments() + ")"; +} + +} + public static class Builder extends AbstractRequest.Builder { -private final Map
[jira] [Assigned] (KAFKA-7505) Flaky test: SslTransportLayerTest.testListenerConfigOverride
[ https://issues.apache.org/jira/browse/KAFKA-7505?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram reassigned KAFKA-7505: - Assignee: Rajini Sivaram > Flaky test: SslTransportLayerTest.testListenerConfigOverride > > > Key: KAFKA-7505 > URL: https://issues.apache.org/jira/browse/KAFKA-7505 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Assignee: Rajini Sivaram >Priority: Major > > This test seems to fail quite a bit recently. I've seen it happen with Java > 11 quite a bit so it could be more likely to fail there. > {code:java} > java.lang.AssertionError: expected: but > was: at org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.failNotEquals(Assert.java:834) at > org.junit.Assert.assertEquals(Assert.java:118) at > org.junit.Assert.assertEquals(Assert.java:144) at > org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:104) > at > org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:109) > at > org.apache.kafka.common.network.SslTransportLayerTest.testListenerConfigOverride(SslTransportLayerTest.java:319){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7505) Flaky test: SslTransportLayerTest.testListenerConfigOverride
[ https://issues.apache.org/jira/browse/KAFKA-7505?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650102#comment-16650102 ] Rajini Sivaram commented on KAFKA-7505: --- The stack trace shows that the client didn't see SSL handshake notification from the broker when the broker failed handshake and then closed the connection. The sequence with older versions where this succeeds: # Client writes SSL handshake data # Broker process handshake data from client, fails handshake # Broker flushes handshake failure notification, waiting if necessary for flush to complete # Client reads data from broker, processes SSL handshake notification # Client processes failure as an authentication exception We see the same sequence with Java 11 most of the time, but sometimes it fails because client is attempting to write more data to the broker (which is possible with the TLS protocol). The sequence is: # Client writes some SSL handshake data # Broker process handshake data from client, fails handshake # Broker flushes failure notification, waiting if necessary for flush to complete # Client attempts to write more data, fails with an I/O exception since broker has closed the connection # Client processes failure as an I/O exception We guarantee that we never process an ordinary I/O exception as an authentication exception, but we don't actually guarantee the reverse. The tests however are strict because we want to try and handle all known authentication failure scenarios. I will see if we can fix this scenario in our implementation. > Flaky test: SslTransportLayerTest.testListenerConfigOverride > > > Key: KAFKA-7505 > URL: https://issues.apache.org/jira/browse/KAFKA-7505 > Project: Kafka > Issue Type: Bug >Reporter: Ismael Juma >Priority: Major > > This test seems to fail quite a bit recently. I've seen it happen with Java > 11 quite a bit so it could be more likely to fail there. > {code:java} > java.lang.AssertionError: expected: but > was: at org.junit.Assert.fail(Assert.java:88) at > org.junit.Assert.failNotEquals(Assert.java:834) at > org.junit.Assert.assertEquals(Assert.java:118) at > org.junit.Assert.assertEquals(Assert.java:144) at > org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:104) > at > org.apache.kafka.common.network.NetworkTestUtils.waitForChannelClose(NetworkTestUtils.java:109) > at > org.apache.kafka.common.network.SslTransportLayerTest.testListenerConfigOverride(SslTransportLayerTest.java:319){code} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7506) KafkaStreams repartition topic settings not suitable for processing old records
[ https://issues.apache.org/jira/browse/KAFKA-7506?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16650014#comment-16650014 ] Niklas Lönn commented on KAFKA-7506: Found the duplicate ticket myself now.. sorry for the noise, however, why has this not been backported to 1.1.x? Jumping to 2.0.0 might not always be feasible? > KafkaStreams repartition topic settings not suitable for processing old > records > --- > > Key: KAFKA-7506 > URL: https://issues.apache.org/jira/browse/KAFKA-7506 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0, 1.1.1, 2.0.0 >Reporter: Niklas Lönn >Priority: Major > Attachments: kafka-7506.patch > > > Hi, We are using Kafka Streams to process a compacted store, when resetting > the application/processing from scratch the default topic configuration for > repartition topics is 50MB and 10min segment sizes. > > As the retention.ms is undefined, this leads to default retention.ms and log > cleaner starts competing with the application, effectively causing the > streams app to skip records. > {{Application logs the following:}} > {{Fetch offset 213792 is out of range for partition > app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7, resetting > offset}} > {{Fetch offset 110227 is out of range for partition > app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2, resetting > offset}} > {{Resetting offset for partition > app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7 to offset > 233302.}} > {{Resetting offset for partition > app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2 to offset > 119914.}} > By adding the following configuration to RepartitionTopicConfig.java the > issue is solved > {{tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // > Infinite}} > > My understanding is that this should be safe as KafkaStreams uses the admin > API to delete segments. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7506) KafkaStreams repartition topic settings not suitable for processing old records
[ https://issues.apache.org/jira/browse/KAFKA-7506?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Niklas Lönn updated KAFKA-7506: --- Description: Hi, We are using Kafka Streams to process a compacted store, when resetting the application/processing from scratch the default topic configuration for repartition topics is 50MB and 10min segment sizes. As the retention.ms is undefined, this leads to default retention.ms and log cleaner starts competing with the application, effectively causing the streams app to skip records. {{Application logs the following:}} {{Fetch offset 213792 is out of range for partition app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7, resetting offset}} {{Fetch offset 110227 is out of range for partition app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2, resetting offset}} {{Resetting offset for partition app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7 to offset 233302.}} {{Resetting offset for partition app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2 to offset 119914.}} By adding the following configuration to RepartitionTopicConfig.java the issue is solved {{tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // Infinite}} My understanding is that this should be safe as KafkaStreams uses the admin API to delete segments. was: Hi, We are using Kafka Streams to process a compacted store, when resetting the application/processing from scratch the default topic configuration for repartition topics is 50MB and 10min segment sizes. As the retention.ms is undefined, this leads to default retention.ms and log cleaner starts competing with the application, effectively causing the streams app to skip records. {{Application logs the following:}} {\{ Fetch offset 213792 is out of range for partition app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7, resetting offset}} \{{ Fetch offset 110227 is out of range for partition app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2, resetting offset}} \{{ Resetting offset for partition app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7 to offset 233302.}} \{{ Resetting offset for partition app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2 to offset 119914.}} By adding the following configuration to RepartitionTopicConfig.java the issue is solved {{tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // Infinite}} My understanding is that this should be safe as KafkaStreams uses the admin API to delete segments. > KafkaStreams repartition topic settings not suitable for processing old > records > --- > > Key: KAFKA-7506 > URL: https://issues.apache.org/jira/browse/KAFKA-7506 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 1.1.0 >Reporter: Niklas Lönn >Priority: Major > > Hi, We are using Kafka Streams to process a compacted store, when resetting > the application/processing from scratch the default topic configuration for > repartition topics is 50MB and 10min segment sizes. > > As the retention.ms is undefined, this leads to default retention.ms and log > cleaner starts competing with the application, effectively causing the > streams app to skip records. > {{Application logs the following:}} > {{Fetch offset 213792 is out of range for partition > app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7, resetting > offset}} > {{Fetch offset 110227 is out of range for partition > app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2, resetting > offset}} > {{Resetting offset for partition > app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7 to offset > 233302.}} > {{Resetting offset for partition > app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2 to offset > 119914.}} > By adding the following configuration to RepartitionTopicConfig.java the > issue is solved > {{tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // > Infinite}} > > My understanding is that this should be safe as KafkaStreams uses the admin > API to delete segments. > -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7479) Call to "producer.initTransaction" hangs when using IP for "bootstrap.servers"
[ https://issues.apache.org/jira/browse/KAFKA-7479?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16649996#comment-16649996 ] huxihx commented on KAFKA-7479: --- Seems it was already fixed by KAFKA-6446 > Call to "producer.initTransaction" hangs when using IP for "bootstrap.servers" > -- > > Key: KAFKA-7479 > URL: https://issues.apache.org/jira/browse/KAFKA-7479 > Project: Kafka > Issue Type: Bug > Components: clients >Affects Versions: 1.0.0 >Reporter: Gene B. >Priority: Major > Attachments: KAFKA-7479.log, KafkaProducerSample.java > > > When using IP address for "bootstrap.servers", > And Kafka server is installed in a VM (Virtual Box) > Then transactional Kafka client is hanging on call > "producer.initTransaction", and the call never completes. > Current workaround is to add Kafka host's name to the "hosts" file, but this > approach will not scale. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7506) KafkaStreams repartition topic settings not suitable for processing old records
Niklas Lönn created KAFKA-7506: -- Summary: KafkaStreams repartition topic settings not suitable for processing old records Key: KAFKA-7506 URL: https://issues.apache.org/jira/browse/KAFKA-7506 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.1.0 Reporter: Niklas Lönn Hi, We are using Kafka Streams to process a compacted store, when resetting the application/processing from scratch the default topic configuration for repartition topics is 50MB and 10min segment sizes. As the retention.ms is undefined, this leads to default retention.ms and log cleaner starts competing with the application, effectively causing the streams app to skip records. {{Application logs the following:}} {\{ Fetch offset 213792 is out of range for partition app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7, resetting offset}} \{{ Fetch offset 110227 is out of range for partition app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2, resetting offset}} \{{ Resetting offset for partition app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-7 to offset 233302.}} \{{ Resetting offset for partition app-id-KTABLE-AGGREGATE-STATE-STORE-15-repartition-2 to offset 119914.}} By adding the following configuration to RepartitionTopicConfig.java the issue is solved {{tempTopicDefaultOverrides.put(TopicConfig.RETENTION_MS_CONFIG, "-1"); // Infinite}} My understanding is that this should be safe as KafkaStreams uses the admin API to delete segments. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7412) Bug prone response from producer.send(ProducerRecord, Callback) if Kafka broker is not running
[ https://issues.apache.org/jira/browse/KAFKA-7412?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16649959#comment-16649959 ] ASF GitHub Bot commented on KAFKA-7412: --- huxihx opened a new pull request #5798: KAFKA-7412: onComplete should not reassign `metadata` variable URL: https://github.com/apache/kafka/pull/5798 The Java doc for `InterceptorCallback#onComplete` says that exactly one of the arguments(metadata and exception) will be non-null. However, the commitment will be broken when TimeoutException is encountered since the code reassigns a new-created RecordMetadata object to variable `metadata`. The solution is to leave `metadata1` unchanged and pass a new RecordMetadata instance to `onAcknowledgement`. *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Bug prone response from producer.send(ProducerRecord, Callback) if Kafka > broker is not running > -- > > Key: KAFKA-7412 > URL: https://issues.apache.org/jira/browse/KAFKA-7412 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.0.0 >Reporter: Michal Turek >Priority: Major > Attachments: both_metadata_and_exception.png, > metadata_when_kafka_is_stopped.png > > > Hi there, I have probably found a bug in Java Kafka producer client. > Scenario & current behavior: > - Start Kafka broker, single instance. > - Start application that produces messages to Kafka. > - Let the application to load partitions for a topic to warm up the producer, > e.g. send a message to Kafka. I'm not sure if this is necessary step, but our > code does it. > - Gracefully stop the Kafka broker. > - Application logs now contains "org.apache.kafka.clients.NetworkClient: > [Producer clientId=...] Connection to node 0 could not be established. Broker > may not be available." so the client is aware about the Kafka unavailability. > - Trigger the producer to send a message using > KafkaProducer.send(ProducerRecord, Callback) method. > - The callback that notifies business code receives non-null RecordMetadata > and null Exception after request.timeout.ms. The metadata contains offset -1 > which is value of ProduceResponse.INVALID_OFFSET. > Expected behavior: > - If the Kafka is not running and the message is not appended to the log, the > callback should contain null RecordMetadata and non-null Exception. At least > I subjectively understand the Javadoc this way, "exception on production > error" in simple words. > - Developer that is not aware of this behavior and that doesn't test for > offset -1, may consider the message as successfully send and properly acked > by the broker. > Known workaround > - Together with checking for non-null exception in the callback, add another > condition for ProduceResponse.INVALID_OFFSET. > {noformat} > try { > producer.send(record, (metadata, exception) -> { > if (metadata != null) { > if (metadata.offset() != > ProduceResponse.INVALID_OFFSET) { > // Success > } else { > // Failure > } > } else { > // Failure > } > }); > } catch (Exception e) { > // Failure > } > {noformat} > Used setup > - Latest Kafka 2.0.0 for both broker and Java client. > - Originally found with broker 0.11.0.1 and client 2.0.0. > - Code is analogy of the one in Javadoc of KafkaProducer.send(). > - Used producer configuration (others use defaults). > {noformat} > bootstrap.servers = "localhost:9092" > client.id = "..." > acks = "all" > retries = 1 > linger.ms = "20" > compression.type = "lz4" >
[jira] [Assigned] (KAFKA-7412) Bug prone response from producer.send(ProducerRecord, Callback) if Kafka broker is not running
[ https://issues.apache.org/jira/browse/KAFKA-7412?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] huxihx reassigned KAFKA-7412: - Assignee: huxihx > Bug prone response from producer.send(ProducerRecord, Callback) if Kafka > broker is not running > -- > > Key: KAFKA-7412 > URL: https://issues.apache.org/jira/browse/KAFKA-7412 > Project: Kafka > Issue Type: Bug > Components: producer >Affects Versions: 2.0.0 >Reporter: Michal Turek >Assignee: huxihx >Priority: Major > Attachments: both_metadata_and_exception.png, > metadata_when_kafka_is_stopped.png > > > Hi there, I have probably found a bug in Java Kafka producer client. > Scenario & current behavior: > - Start Kafka broker, single instance. > - Start application that produces messages to Kafka. > - Let the application to load partitions for a topic to warm up the producer, > e.g. send a message to Kafka. I'm not sure if this is necessary step, but our > code does it. > - Gracefully stop the Kafka broker. > - Application logs now contains "org.apache.kafka.clients.NetworkClient: > [Producer clientId=...] Connection to node 0 could not be established. Broker > may not be available." so the client is aware about the Kafka unavailability. > - Trigger the producer to send a message using > KafkaProducer.send(ProducerRecord, Callback) method. > - The callback that notifies business code receives non-null RecordMetadata > and null Exception after request.timeout.ms. The metadata contains offset -1 > which is value of ProduceResponse.INVALID_OFFSET. > Expected behavior: > - If the Kafka is not running and the message is not appended to the log, the > callback should contain null RecordMetadata and non-null Exception. At least > I subjectively understand the Javadoc this way, "exception on production > error" in simple words. > - Developer that is not aware of this behavior and that doesn't test for > offset -1, may consider the message as successfully send and properly acked > by the broker. > Known workaround > - Together with checking for non-null exception in the callback, add another > condition for ProduceResponse.INVALID_OFFSET. > {noformat} > try { > producer.send(record, (metadata, exception) -> { > if (metadata != null) { > if (metadata.offset() != > ProduceResponse.INVALID_OFFSET) { > // Success > } else { > // Failure > } > } else { > // Failure > } > }); > } catch (Exception e) { > // Failure > } > {noformat} > Used setup > - Latest Kafka 2.0.0 for both broker and Java client. > - Originally found with broker 0.11.0.1 and client 2.0.0. > - Code is analogy of the one in Javadoc of KafkaProducer.send(). > - Used producer configuration (others use defaults). > {noformat} > bootstrap.servers = "localhost:9092" > client.id = "..." > acks = "all" > retries = 1 > linger.ms = "20" > compression.type = "lz4" > request.timeout.ms = 5000 # The same behavior is with default, this is to > speed up the tests > {noformat} -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-4084) automated leader rebalance causes replication downtime for clusters with too many partitions
[ https://issues.apache.org/jira/browse/KAFKA-4084?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16649839#comment-16649839 ] Jiaxin YE commented on KAFKA-4084: -- [~wushujames] : Wow, thanks! Let me try that out. > automated leader rebalance causes replication downtime for clusters with too > many partitions > > > Key: KAFKA-4084 > URL: https://issues.apache.org/jira/browse/KAFKA-4084 > Project: Kafka > Issue Type: Bug > Components: controller >Affects Versions: 0.8.2.2, 0.9.0.0, 0.9.0.1, 0.10.0.0, 0.10.0.1 >Reporter: Tom Crayford >Priority: Major > Labels: reliability > Fix For: 1.1.0 > > > If you enable {{auto.leader.rebalance.enable}} (which is on by default), and > you have a cluster with many partitions, there is a severe amount of > replication downtime following a restart. This causes > `UnderReplicatedPartitions` to fire, and replication is paused. > This is because the current automated leader rebalance mechanism changes > leaders for *all* imbalanced partitions at once, instead of doing it > gradually. This effectively stops all replica fetchers in the cluster > (assuming there are enough imbalanced partitions), and restarts them. This > can take minutes on busy clusters, during which no replication is happening > and user data is at risk. Clients with {{acks=-1}} also see issues at this > time, because replication is effectively stalled. > To quote Todd Palino from the mailing list: > bq. There is an admin CLI command to trigger the preferred replica election > manually. There is also a broker configuration “auto.leader.rebalance.enable” > which you can set to have the broker automatically perform the PLE when > needed. DO NOT USE THIS OPTION. There are serious performance issues when > doing so, especially on larger clusters. It needs some development work that > has not been fully identified yet. > This setting is extremely useful for smaller clusters, but with high > partition counts causes the huge issues stated above. > One potential fix could be adding a new configuration for the number of > partitions to do automated leader rebalancing for at once, and *stop* once > that number of leader rebalances are in flight, until they're done. There may > be better mechanisms, and I'd love to hear if anybody has any ideas. -- This message was sent by Atlassian JIRA (v7.6.3#76005)