[jira] [Resolved] (KAFKA-7140) Remove deprecated poll usages
[ https://issues.apache.org/jira/browse/KAFKA-7140?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Jason Gustafson resolved KAFKA-7140. Resolution: Fixed > Remove deprecated poll usages > - > > Key: KAFKA-7140 > URL: https://issues.apache.org/jira/browse/KAFKA-7140 > Project: Kafka > Issue Type: Improvement >Reporter: Viktor Somogyi >Assignee: Viktor Somogyi >Priority: Minor > > There are a couple of poll(long) usages of the consumer in test and non-test > code. This jira would aim to remove the non-test usages of the method. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7140) Remove deprecated poll usages
[ https://issues.apache.org/jira/browse/KAFKA-7140?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16577062#comment-16577062 ] ASF GitHub Bot commented on KAFKA-7140: --- hachikuji closed pull request #5319: KAFKA-7140: Remove deprecated poll usages URL: https://github.com/apache/kafka/pull/5319 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java index 47f8529e2d1..692331ed13f 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java @@ -53,6 +53,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; @@ -441,7 +442,7 @@ public String toString() { } private ConsumerRecords pollConsumer(long timeoutMs) { -ConsumerRecords msgs = consumer.poll(timeoutMs); +ConsumerRecords msgs = consumer.poll(Duration.ofMillis(timeoutMs)); // Exceptions raised from the task during a rebalance should be rethrown to stop the worker if (rebalanceException != null) { diff --git a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java index de1ceb3be10..ea9b4c621f9 100644 --- a/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java +++ b/connect/runtime/src/main/java/org/apache/kafka/connect/util/KafkaBasedLog.java @@ -35,6 +35,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.ArrayDeque; import java.util.ArrayList; import java.util.Iterator; @@ -253,7 +254,7 @@ public void send(K key, V value, org.apache.kafka.clients.producer.Callback call private void poll(long timeoutMs) { try { -ConsumerRecords records = consumer.poll(timeoutMs); +ConsumerRecords records = consumer.poll(Duration.ofMillis(timeoutMs)); for (ConsumerRecord record : records) consumedCallback.onCompletion(null, record); } catch (WakeupException e) { diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java index 1bf9c717068..6d92c34adef 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/ErrorHandlingTaskTest.java @@ -65,6 +65,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.time.Duration; import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -180,8 +181,8 @@ public void testErrorHandlingInSinkTasks() throws Exception { // bad json ConsumerRecord record2 = new ConsumerRecord<>(TOPIC, PARTITION2, FIRST_OFFSET, null, "{\"a\" 10}".getBytes()); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record1)); - EasyMock.expect(consumer.poll(EasyMock.anyLong())).andReturn(records(record2)); + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong(.andReturn(records(record1)); + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong(.andReturn(records(record2)); sinkTask.put(EasyMock.anyObject()); EasyMock.expectLastCall().times(2); diff --git a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java index 4a7c760fc74..33ab2ef06e0 100644 --- a/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java +++ b/connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java @@ -58,6 +58,7 @@ import org.powermock.modules.junit4.PowerMockRunner; import org.powermock.reflect.Whitebox; +import java.time.Duration; import java.util.ArrayList; import java.util.Collection; import java.util.Collections; @@ -458,7 +459,7 @@ public void testWakeupInCommitSyncCausesRetry() throws Exception { sinkTask.open(partitions); EasyMock.expectLastCall(); -EasyMock.expect(consumer.poll(EasyMock.anyLong())).andAnswer( + EasyMock.expect(consumer.poll(Duration.ofMillis(EasyMock.anyLong(.andAnswer(
[jira] [Commented] (KAFKA-7225) Kafka Connect ConfigProvider not invoked before validation
[ https://issues.apache.org/jira/browse/KAFKA-7225?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576961#comment-16576961 ] ASF GitHub Bot commented on KAFKA-7225: --- rhauch opened a new pull request #5489: KAFKA-7225: Corrected system tests by generating external properties file URL: https://github.com/apache/kafka/pull/5489 Fix system tests from earlier #5445 *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Kafka Connect ConfigProvider not invoked before validation > -- > > Key: KAFKA-7225 > URL: https://issues.apache.org/jira/browse/KAFKA-7225 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 2.0.0 >Reporter: Nacho Munoz >Assignee: Robert Yokota >Priority: Minor > Fix For: 2.0.1, 2.1.0 > > > When trying to register a JDBC connector with externalised secrets (e.g. > connection.password) the validation fails and the endpoint returns a 500. I > think that the problem is that the config transformer is not being invoked > before the validation so trying to exercise the credentials against the > database fails. I have checked that publishing the connector configuration > directly to the connect-config topic to skip the validation and restarting > the server is enough to get the connector working so that confirms that we > are just missing to call config transformer before validating the connector. > Please let me know if you need further information. > I'm happy to open a PR to address this issue given that I think that this is > easy enough to fix for a new contributor to the project. So please feel free > to assign the resolution of the bug to me. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
[ https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax resolved KAFKA-6966. Resolution: Fixed > Extend `TopologyDescription.Sink` to return `TopicNameExtractor` > > > Key: KAFKA-6966 > URL: https://issues.apache.org/jira/browse/KAFKA-6966 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Assignee: Nishanth Pradeep >Priority: Major > Labels: beginner, kip, newbie > Fix For: 2.1.0 > > > With KIP-303, a dynamic routing feature was added and > `TopologyDescription.Sink#topic()` returns `null` if this feature is used. > It would be useful to get the actually used `TopicNameExtractor` class from > the `TopologyDescription`. > We suggest to add `Class > TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if > dynamic routing feature is not used. > KIP-321: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
[ https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6966: --- Fix Version/s: 2.1.0 > Extend `TopologyDescription.Sink` to return `TopicNameExtractor` > > > Key: KAFKA-6966 > URL: https://issues.apache.org/jira/browse/KAFKA-6966 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Assignee: Nishanth Pradeep >Priority: Major > Labels: beginner, kip, newbie > Fix For: 2.1.0 > > > With KIP-303, a dynamic routing feature was added and > `TopologyDescription.Sink#topic()` returns `null` if this feature is used. > It would be useful to get the actually used `TopicNameExtractor` class from > the `TopologyDescription`. > We suggest to add `Class > TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if > dynamic routing feature is not used. > KIP-321: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
[ https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6966: --- Description: With KIP-303, a dynamic routing feature was added and `TopologyDescription.Sink#topic()` returns `null` if this feature is used. It would be useful to get the actually used `TopicNameExtractor` class from the `TopologyDescription`. We suggest to add `Class TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if dynamic routing feature is not used. KIP-321: [https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes] was: With KIP-303, a dynamic routing feature was added and `TopologyDescription.Sink#topic()` returns `null` if this feature is used. It would be useful to get the actually used `TopicNameExtractor` class from the `TopologyDescription`. We suggest to add `Class TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if dynamic routing feature is not used. This is a public API change and requires a KIP: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals > Extend `TopologyDescription.Sink` to return `TopicNameExtractor` > > > Key: KAFKA-6966 > URL: https://issues.apache.org/jira/browse/KAFKA-6966 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Assignee: Nishanth Pradeep >Priority: Major > Labels: beginner, kip, newbie > > With KIP-303, a dynamic routing feature was added and > `TopologyDescription.Sink#topic()` returns `null` if this feature is used. > It would be useful to get the actually used `TopicNameExtractor` class from > the `TopologyDescription`. > We suggest to add `Class > TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if > dynamic routing feature is not used. > KIP-321: > [https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes] -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
[ https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-6966: --- Labels: beginner kip newbie (was: beginner needs-kip newbie) > Extend `TopologyDescription.Sink` to return `TopicNameExtractor` > > > Key: KAFKA-6966 > URL: https://issues.apache.org/jira/browse/KAFKA-6966 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 2.0.0 >Reporter: Matthias J. Sax >Assignee: Nishanth Pradeep >Priority: Major > Labels: beginner, kip, newbie > > With KIP-303, a dynamic routing feature was added and > `TopologyDescription.Sink#topic()` returns `null` if this feature is used. > It would be useful to get the actually used `TopicNameExtractor` class from > the `TopologyDescription`. > We suggest to add `Class > TopologyDescription.Sink#topicNameExtractor()` and let it return `null` if > dynamic routing feature is not used. > This is a public API change and requires a KIP: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6966) Extend `TopologyDescription.Sink` to return `TopicNameExtractor`
[ https://issues.apache.org/jira/browse/KAFKA-6966?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576949#comment-16576949 ] ASF GitHub Bot commented on KAFKA-6966: --- mjsax closed pull request #5284: KAFKA-6966: Extend TopologyDescription.Sink to return TopicNameExtractor URL: https://github.com/apache/kafka/pull/5284 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/docs/streams/upgrade-guide.html b/docs/streams/upgrade-guide.html index 34f66ce53fe..35e1f77fd4c 100644 --- a/docs/streams/upgrade-guide.html +++ b/docs/streams/upgrade-guide.html @@ -90,6 +90,14 @@ Upgrade Guide and API Changes We have also removed some public APIs that are deprecated prior to 1.0.x in 2.0.0. See below for a detailed list of removed APIs. +Streams API changes in 2.1.0 + +We updated TopologyDescription API to allow for better runtime checking. +Users are encouraged to use #topicSet() and #topicPattern() accordingly on TopologyDescription.Source nodes, +instead of using #topics(), which has since been deprecated. Similarly, use #topic() and #topicNameExtractor() +to get descriptions of TopologyDescription.Sink nodes. For more details, see +https://cwiki.apache.org/confluence/display/KAFKA/KIP-321%3A+Update+TopologyDescription+to+better+represent+Source+and+Sink+Nodes;>KIP-321. + Streams API changes in 2.0.0 diff --git a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java index 04a292f9a97..870052d7399 100644 --- a/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java +++ b/streams/src/main/java/org/apache/kafka/streams/TopologyDescription.java @@ -16,9 +16,11 @@ */ package org.apache.kafka.streams; +import org.apache.kafka.streams.processor.TopicNameExtractor; import org.apache.kafka.streams.processor.internals.StreamTask; import java.util.Set; +import java.util.regex.Pattern; /** * A meta representation of a {@link Topology topology}. @@ -113,8 +115,22 @@ /** * The topic names this source node is reading from. * @return comma separated list of topic names or pattern (as String) + * @deprecated use {@link #topicSet()} or {@link #topicPattern()} instead */ +@Deprecated String topics(); + +/** + * The topic names this source node is reading from. + * @return a set of topic names + */ +Set topicSet(); + +/** + * The pattern used to match topic names that is reading from. + * @return the pattern used to match topic names + */ +Pattern topicPattern(); } /** @@ -134,10 +150,17 @@ interface Sink extends Node { /** * The topic name this sink node is writing to. - * Could be null if the topic name can only be dynamically determined based on {@code TopicNameExtractor} + * Could be {@code null} if the topic name can only be dynamically determined based on {@link TopicNameExtractor} * @return a topic name */ String topic(); + +/** + * The {@link TopicNameExtractor} class that this sink node uses to dynamically extract the topic name to write to. + * Could be {@code null} if the topic name is not dynamically determined. + * @return the {@link TopicNameExtractor} class used get the topic name + */ +TopicNameExtractor topicNameExtractor(); } /** diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java index 250105ad2a3..2944f6ba29b 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/InternalTopologyBuilder.java @@ -282,15 +282,7 @@ private boolean isMatch(final String topic) { @Override Source describe() { -final String sourceTopics; - -if (pattern == null) { -sourceTopics = topics.toString(); -} else { -sourceTopics = pattern.toString(); -} - -return new Source(name, sourceTopics); +return new Source(name, new HashSet<>(topics), pattern); } } @@ -1337,7 +1329,7 @@ public GlobalStore(final String sourceName, final String storeName, final String topicName,
[jira] [Comment Edited] (KAFKA-6571) KafkaProducer.close(0) should be non-blocking
[ https://issues.apache.org/jira/browse/KAFKA-6571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576931#comment-16576931 ] Ahmed Al-Mehdi edited comment on KAFKA-6571 at 8/10/18 10:35 PM: - After discussion with [~lindong], assigning bug to self. was (Author: ahmeda): After discussion with Dong, assigning bug to self. > KafkaProducer.close(0) should be non-blocking > - > > Key: KAFKA-6571 > URL: https://issues.apache.org/jira/browse/KAFKA-6571 > Project: Kafka > Issue Type: Bug >Reporter: Dong Lin >Assignee: Ahmed Al-Mehdi >Priority: Major > > According to the Java doc of producer.close(long timeout, TimeUnit timeUnit), > it is said that "Specifying a timeout of zero means do not wait for pending > send requests to complete". However, producer.close(0) can currently block on > waiting for the sender thread to exit, which in turn can block on user's > callback. > We probably should not let producer.close(0) join the sender thread if user > has specified zero timeout. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6571) KafkaProducer.close(0) should be non-blocking
[ https://issues.apache.org/jira/browse/KAFKA-6571?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576931#comment-16576931 ] Ahmed Al-Mehdi commented on KAFKA-6571: --- After discussion with Dong, assigning bug to self. > KafkaProducer.close(0) should be non-blocking > - > > Key: KAFKA-6571 > URL: https://issues.apache.org/jira/browse/KAFKA-6571 > Project: Kafka > Issue Type: Bug >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > > According to the Java doc of producer.close(long timeout, TimeUnit timeUnit), > it is said that "Specifying a timeout of zero means do not wait for pending > send requests to complete". However, producer.close(0) can currently block on > waiting for the sender thread to exit, which in turn can block on user's > callback. > We probably should not let producer.close(0) join the sender thread if user > has specified zero timeout. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Assigned] (KAFKA-6571) KafkaProducer.close(0) should be non-blocking
[ https://issues.apache.org/jira/browse/KAFKA-6571?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Ahmed Al-Mehdi reassigned KAFKA-6571: - Assignee: Ahmed Al-Mehdi (was: Dong Lin) > KafkaProducer.close(0) should be non-blocking > - > > Key: KAFKA-6571 > URL: https://issues.apache.org/jira/browse/KAFKA-6571 > Project: Kafka > Issue Type: Bug >Reporter: Dong Lin >Assignee: Ahmed Al-Mehdi >Priority: Major > > According to the Java doc of producer.close(long timeout, TimeUnit timeUnit), > it is said that "Specifying a timeout of zero means do not wait for pending > send requests to complete". However, producer.close(0) can currently block on > waiting for the sender thread to exit, which in turn can block on user's > callback. > We probably should not let producer.close(0) join the sender thread if user > has specified zero timeout. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6998) Remove caching wrapper stores if cache-size is configured to zero bytes
[ https://issues.apache.org/jira/browse/KAFKA-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576926#comment-16576926 ] ASF GitHub Bot commented on KAFKA-6998: --- guozhangwang opened a new pull request #5488: KAFKA-6998: Disable Caching when max.cache.bytes are zero. URL: https://github.com/apache/kafka/pull/5488 1. As titled, add a `rewriteTopology` that 1) sets application id, 2) maybe disable caching, 3) adjust for source KTable. This optimization can hence be applied for both DSL or PAPI generated Topology. 2. Defer the building of globalStateStores in `rewriteTopology` so that we can also disable caching. But we still need to build the state stores before `InternalTopologyBuilder.build()` since we should only build global stores once for all threads. 3. Added withCachingDisabled to StoreBuilder, it is a public API change. 4. [Optional] Fixed unit test config setting functionalities, and set the necessary config to shorten the unit test latency (now it reduces from 5min to 3.5min on my laptop). ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Remove caching wrapper stores if cache-size is configured to zero bytes > --- > > Key: KAFKA-6998 > URL: https://issues.apache.org/jira/browse/KAFKA-6998 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > Users can disable caching globally by setting the cache size to zero in their > config. However, this does only effectively disable the caching layer, but > the code is still in place. > We should consider to remove the caching wrappers completely for this case. > The tricky part is, that we insert the caching layer at compile time, ie, > when calling `StreamsBuilder#build()` – at this point, we don't know the > configuration yet. Thus, we need to find a way to rewrite the topology after > it is passed to `KafkaStreams` if case caching size is set to zero. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7147) Allow kafka-reassign-partitions.sh and kafka-log-dirs.sh to take admin client property file
[ https://issues.apache.org/jira/browse/KAFKA-7147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576871#comment-16576871 ] ASF GitHub Bot commented on KAFKA-7147: --- lindong28 closed pull request #5355: KAFKA-7147; ReassignPartitionsCommand should be able to connect to broker over SSL URL: https://github.com/apache/kafka/pull/5355 This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/core/src/main/scala/kafka/admin/LogDirsCommand.scala b/core/src/main/scala/kafka/admin/LogDirsCommand.scala index 9257942d1c0..b51f25d6481 100644 --- a/core/src/main/scala/kafka/admin/LogDirsCommand.scala +++ b/core/src/main/scala/kafka/admin/LogDirsCommand.scala @@ -27,6 +27,7 @@ import scala.collection.JavaConverters._ import scala.collection.Map import kafka.utils.{CommandLineUtils, Json} import joptsimple._ +import org.apache.kafka.common.utils.Utils /** * A command for querying log directory usage on the specified brokers @@ -83,9 +84,12 @@ object LogDirsCommand { } private def createAdminClient(opts: LogDirsCommandOptions): JAdminClient = { -val props = new Properties() +val props = if (opts.options.has(opts.commandConfigOpt)) +Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) +else +new Properties() props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)) -props.put(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool") +props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "log-dirs-tool") JAdminClient.create(props) } @@ -95,6 +99,10 @@ object LogDirsCommand { .withRequiredArg .describedAs("The server(s) to use for bootstrapping") .ofType(classOf[String]) +val commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.") + .withRequiredArg + .describedAs("Admin client property file") + .ofType(classOf[String]) val describeOpt = parser.accepts("describe", "Describe the specified log directories on the specified brokers.") val topicListOpt = parser.accepts("topic-list", "The list of topics to be queried in the form \"topic1,topic2,topic3\". " + "All topics will be queried if no topic list is specified") diff --git a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala index 4d9da90bc69..dab34a69267 100755 --- a/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala +++ b/core/src/main/scala/kafka/admin/ReassignPartitionsCommand.scala @@ -72,9 +72,12 @@ object ReassignPartitionsCommand extends Logging { private def createAdminClient(opts: ReassignPartitionsCommandOptions): Option[JAdminClient] = { if (opts.options.has(opts.bootstrapServerOpt)) { - val props = new Properties() + val props = if (opts.options.has(opts.commandConfigOpt)) +Utils.loadProps(opts.options.valueOf(opts.commandConfigOpt)) + else +new Properties() props.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, opts.options.valueOf(opts.bootstrapServerOpt)) - props.put(AdminClientConfig.CLIENT_ID_CONFIG, "reassign-partitions-tool") + props.putIfAbsent(AdminClientConfig.CLIENT_ID_CONFIG, "reassign-partitions-tool") Some(JAdminClient.create(props)) } else { None @@ -456,6 +459,10 @@ object ReassignPartitionsCommand extends Logging { .withRequiredArg .describedAs("Server(s) to use for bootstrapping") .ofType(classOf[String]) +val commandConfigOpt = parser.accepts("command-config", "Property file containing configs to be passed to Admin Client.") + .withRequiredArg + .describedAs("Admin client property file") + .ofType(classOf[String]) val zkConnectOpt = parser.accepts("zookeeper", "REQUIRED: The connection string for the zookeeper connection in the " + "form host:port. Multiple URLS can be given to allow fail-over.") .withRequiredArg This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Allow kafka-reassign-partitions.sh and kafka-log-dirs.sh to take admin client > property file >
[jira] [Resolved] (KAFKA-7147) Allow kafka-reassign-partitions.sh and kafka-log-dirs.sh to take admin client property file
[ https://issues.apache.org/jira/browse/KAFKA-7147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin resolved KAFKA-7147. - Resolution: Fixed > Allow kafka-reassign-partitions.sh and kafka-log-dirs.sh to take admin client > property file > --- > > Key: KAFKA-7147 > URL: https://issues.apache.org/jira/browse/KAFKA-7147 > Project: Kafka > Issue Type: Improvement >Reporter: Dong Lin >Assignee: Dong Lin >Priority: Major > Fix For: 2.1.0 > > > Currently both ReassignPartitionsCommand and LogDirsCommand instantiates > AdminClient using bootstrap.servers and client.id provided by the user. Since > it does not provide other ssl-related properties, these tools will not be > able to talk to broker over SSL. > In order to solve this problem, these tools should allow users to provide > property file containing configs to be passed to AdminClient. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7277) Migrate Streams API to Duration instead of longMs times
[ https://issues.apache.org/jira/browse/KAFKA-7277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7277: --- Labels: beginner needs-kip newbie (was: needs-kip) > Migrate Streams API to Duration instead of longMs times > --- > > Key: KAFKA-7277 > URL: https://issues.apache.org/jira/browse/KAFKA-7277 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: beginner, needs-kip, newbie > > Right now Streams API unversally represents time as ms-since-unix-epoch. > There's nothing wrong, per se, with this, but Duration is more ergonomic for > an API. > What we don't want is to present a heterogeneous API, so we need to make sure > the whole Streams API is in terms of Duration. > > Implementation note: Durations potentially worsen memory pressure and gc > performance, so internally, we will still use longMs as the representation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7277) Migrate Streams API to Duration instead of longMs times
[ https://issues.apache.org/jira/browse/KAFKA-7277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7277: --- Description: Right now Streams API unversally represents time as ms-since-unix-epoch. There's nothing wrong, per se, with this, but Duration is more ergonomic for an API. What we don't want is to present a heterogeneous API, so we need to make sure the whole Streams API is in terms of Duration. Implementation note: Durations potentially worsen memory pressure and gc performance, so internally, we will still use longMs as the representation. KIP instructuions: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals was: Right now Streams API unversally represents time as ms-since-unix-epoch. There's nothing wrong, per se, with this, but Duration is more ergonomic for an API. What we don't want is to present a heterogeneous API, so we need to make sure the whole Streams API is in terms of Duration. Implementation note: Durations potentially worsen memory pressure and gc performance, so internally, we will still use longMs as the representation. > Migrate Streams API to Duration instead of longMs times > --- > > Key: KAFKA-7277 > URL: https://issues.apache.org/jira/browse/KAFKA-7277 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: beginner, needs-kip, newbie > > Right now Streams API unversally represents time as ms-since-unix-epoch. > There's nothing wrong, per se, with this, but Duration is more ergonomic for > an API. > What we don't want is to present a heterogeneous API, so we need to make sure > the whole Streams API is in terms of Duration. > > Implementation note: Durations potentially worsen memory pressure and gc > performance, so internally, we will still use longMs as the representation. > KIP instructuions: > https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7277) Migrate Streams API to Duration instead of longMs times
[ https://issues.apache.org/jira/browse/KAFKA-7277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7277: --- Labels: needs-kip (was: ) > Migrate Streams API to Duration instead of longMs times > --- > > Key: KAFKA-7277 > URL: https://issues.apache.org/jira/browse/KAFKA-7277 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > Labels: needs-kip > > Right now Streams API unversally represents time as ms-since-unix-epoch. > There's nothing wrong, per se, with this, but Duration is more ergonomic for > an API. > What we don't want is to present a heterogeneous API, so we need to make sure > the whole Streams API is in terms of Duration. > > Implementation note: Durations potentially worsen memory pressure and gc > performance, so internally, we will still use longMs as the representation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Updated] (KAFKA-7277) Migrate Streams API to Duration instead of longMs times
[ https://issues.apache.org/jira/browse/KAFKA-7277?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthias J. Sax updated KAFKA-7277: --- Component/s: streams > Migrate Streams API to Duration instead of longMs times > --- > > Key: KAFKA-7277 > URL: https://issues.apache.org/jira/browse/KAFKA-7277 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: John Roesler >Priority: Major > > Right now Streams API unversally represents time as ms-since-unix-epoch. > There's nothing wrong, per se, with this, but Duration is more ergonomic for > an API. > What we don't want is to present a heterogeneous API, so we need to make sure > the whole Streams API is in terms of Duration. > > Implementation note: Durations potentially worsen memory pressure and gc > performance, so internally, we will still use longMs as the representation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7277) Migrate Streams API to Duration instead of longMs times
[ https://issues.apache.org/jira/browse/KAFKA-7277?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576854#comment-16576854 ] John Roesler commented on KAFKA-7277: - I recommend that we validate each duration argument using a class like this: {noformat} package org.apache.kafka.streams.kstream; import java.time.Duration; final class ApiUtils { private ApiUtils() {} static Duration validateMillisecondDuration(final Duration duration, final String valueName) { try { //noinspection ResultOfMethodCallIgnored duration.toMillis(); return duration; } catch (final ArithmeticException e) { throw new IllegalArgumentException( valueName + " must be expressible in milliseconds (" + duration + " is too big)", e ); } } }{noformat} Otherwise, we will wind up throwing ArithmeticException randomly with little explanation. > Migrate Streams API to Duration instead of longMs times > --- > > Key: KAFKA-7277 > URL: https://issues.apache.org/jira/browse/KAFKA-7277 > Project: Kafka > Issue Type: Improvement >Reporter: John Roesler >Priority: Major > > Right now Streams API unversally represents time as ms-since-unix-epoch. > There's nothing wrong, per se, with this, but Duration is more ergonomic for > an API. > What we don't want is to present a heterogeneous API, so we need to make sure > the whole Streams API is in terms of Duration. > > Implementation note: Durations potentially worsen memory pressure and gc > performance, so internally, we will still use longMs as the representation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7277) Migrate Streams API to Duration instead of longMs times
John Roesler created KAFKA-7277: --- Summary: Migrate Streams API to Duration instead of longMs times Key: KAFKA-7277 URL: https://issues.apache.org/jira/browse/KAFKA-7277 Project: Kafka Issue Type: Improvement Reporter: John Roesler Right now Streams API unversally represents time as ms-since-unix-epoch. There's nothing wrong, per se, with this, but Duration is more ergonomic for an API. What we don't want is to present a heterogeneous API, so we need to make sure the whole Streams API is in terms of Duration. Implementation note: Durations potentially worsen memory pressure and gc performance, so internally, we will still use longMs as the representation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6998) Remove caching wrapper stores if cache-size is configured to zero bytes
[ https://issues.apache.org/jira/browse/KAFKA-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576701#comment-16576701 ] Guozhang Wang commented on KAFKA-6998: -- Actually when I was looking into the code, I realized it is not really blocked on `internalTopologyBuilder.build()`. I'll provide a quick draft PR to illustrate my idea. > Remove caching wrapper stores if cache-size is configured to zero bytes > --- > > Key: KAFKA-6998 > URL: https://issues.apache.org/jira/browse/KAFKA-6998 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > Users can disable caching globally by setting the cache size to zero in their > config. However, this does only effectively disable the caching layer, but > the code is still in place. > We should consider to remove the caching wrappers completely for this case. > The tricky part is, that we insert the caching layer at compile time, ie, > when calling `StreamsBuilder#build()` – at this point, we don't know the > configuration yet. Thus, we need to find a way to rewrite the topology after > it is passed to `KafkaStreams` if case caching size is set to zero. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7276) Consider using re2j to speed up regex operations
Ted Yu created KAFKA-7276: - Summary: Consider using re2j to speed up regex operations Key: KAFKA-7276 URL: https://issues.apache.org/jira/browse/KAFKA-7276 Project: Kafka Issue Type: Task Reporter: Ted Yu https://github.com/google/re2j re2j claims to do linear time regular expression matching in Java. Its benefit is most obvious for deeply nested regex (such as a | b | c | d). We should consider using re2j to speed up regex operations. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7275) Prototype lock-free metrics
John Roesler created KAFKA-7275: --- Summary: Prototype lock-free metrics Key: KAFKA-7275 URL: https://issues.apache.org/jira/browse/KAFKA-7275 Project: Kafka Issue Type: Improvement Components: metrics, streams Reporter: John Roesler Assignee: John Roesler Currently, we have to be a little conservative in how granularly we measure things to avoid heavy synchronization costs in the metrics. It should be possible to refactor the thread-safe implementation to use volatile and java.util.concurrent.atomic instead and realize a pretty large performance improvement. However, before investing too much time in it, we should run some benchmarks to gauge how much improvement we can expect. I'd propose to run the benchmarks on trunk with debug turned on, and then to just remove all synchronization and run again to get an upper-bound performance improvement. If the results are promising, we can start prototyping a lock-free implementation. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6998) Remove caching wrapper stores if cache-size is configured to zero bytes
[ https://issues.apache.org/jira/browse/KAFKA-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576462#comment-16576462 ] Guozhang Wang commented on KAFKA-6998: -- Yes, that's exactly my plan :) > Remove caching wrapper stores if cache-size is configured to zero bytes > --- > > Key: KAFKA-6998 > URL: https://issues.apache.org/jira/browse/KAFKA-6998 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > Users can disable caching globally by setting the cache size to zero in their > config. However, this does only effectively disable the caching layer, but > the code is still in place. > We should consider to remove the caching wrappers completely for this case. > The tricky part is, that we insert the caching layer at compile time, ie, > when calling `StreamsBuilder#build()` – at this point, we don't know the > configuration yet. Thus, we need to find a way to rewrite the topology after > it is passed to `KafkaStreams` if case caching size is set to zero. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6998) Remove caching wrapper stores if cache-size is configured to zero bytes
[ https://issues.apache.org/jira/browse/KAFKA-6998?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576459#comment-16576459 ] Matthias J. Sax commented on KAFKA-6998: Just realized, that with the new optimization layer, we have access to the config and can check if the value is set to zero: for this case, we can call `disableCaching()` on all stores before the build the topology. > Remove caching wrapper stores if cache-size is configured to zero bytes > --- > > Key: KAFKA-6998 > URL: https://issues.apache.org/jira/browse/KAFKA-6998 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Matthias J. Sax >Priority: Major > > Users can disable caching globally by setting the cache size to zero in their > config. However, this does only effectively disable the caching layer, but > the code is still in place. > We should consider to remove the caching wrappers completely for this case. > The tricky part is, that we insert the caching layer at compile time, ie, > when calling `StreamsBuilder#build()` – at this point, we don't know the > configuration yet. Thus, we need to find a way to rewrite the topology after > it is passed to `KafkaStreams` if case caching size is set to zero. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7269) KStream.merge is not documented
[ https://issues.apache.org/jira/browse/KAFKA-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576449#comment-16576449 ] Matthias J. Sax commented on KAFKA-7269: I am not sure about "union" – union is a set operation, thus, it actually does not make sense mathematically, because "union" removes duplicates. (Even if most system don't enforce this mathematical property by default but required to specify a "distinct" keyword for this.) > KStream.merge is not documented > --- > > Key: KAFKA-7269 > URL: https://issues.apache.org/jira/browse/KAFKA-7269 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0 >Reporter: John Roesler >Priority: Major > Labels: beginner, newbie > > If I understand the operator correctly, it should be documented as a > stateless transformation at > https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#stateless-transformations -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7269) KStream.merge is not documented
[ https://issues.apache.org/jira/browse/KAFKA-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576361#comment-16576361 ] John Roesler commented on KAFKA-7269: - Hi [~virgilp], Thanks for the tip! It's not in scope to rename the operator right now, but whoever picks this up can bear that in mind when they write the docs. -John > KStream.merge is not documented > --- > > Key: KAFKA-7269 > URL: https://issues.apache.org/jira/browse/KAFKA-7269 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0 >Reporter: John Roesler >Priority: Major > Labels: beginner, newbie > > If I understand the operator correctly, it should be documented as a > stateless transformation at > https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#stateless-transformations -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7119) Intermittent test failure with GSSAPI authentication failure
[ https://issues.apache.org/jira/browse/KAFKA-7119?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16576023#comment-16576023 ] ASF GitHub Bot commented on KAFKA-7119: --- rajinisivaram opened a new pull request #5487: KAFKA-7119: Handle transient Kerberos errors as non-fatal exceptions URL: https://github.com/apache/kafka/pull/5487 ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org > Intermittent test failure with GSSAPI authentication failure > > > Key: KAFKA-7119 > URL: https://issues.apache.org/jira/browse/KAFKA-7119 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > > I have seen this failure a couple of times in builds (e.g. > [https://builds.apache.org/job/kafka-pr-jdk10-scala2.12/2412/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testLogStartOffsetCheckpoint/)] > {quote} > org.apache.kafka.common.errors.SaslAuthenticationException: An error: > (java.security.PrivilegedActionException: javax.security.sasl.SaslException: > GSS initiate failed [Caused by GSSException: No valid credentials provided > (Mechanism level: Request is a replay (34) - Request is a replay)]) occurred > when evaluating SASL token received from the Kafka Broker. Kafka Client will > go to AUTHENTICATION_FAILED state. Caused by: > javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Request is a > replay (34) - Request is a replay)] at > jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:358) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:356) > at java.base/java.security.AccessController.doPrivileged(Native Method) at > java.base/javax.security.auth.Subject.doAs(Subject.java:423) at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:356) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:268) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:205) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487) > at org.apache.kafka.common.network.Selector.poll(Selector.java:425) at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > kafka.api.AdminClientIntegrationTest.$anonfun$subscribeAndWaitForAssignment$2(AdminClientIntegrationTest.scala:980) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:781) at > kafka.api.AdminClientIntegrationTest.subscribeAndWaitForAssignment(AdminClientIntegrationTest.scala:979) > at > kafka.api.AdminClientIntegrationTest.testLogStartOffsetCheckpoint(AdminClientIntegrationTest.scala:755) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at >
[jira] [Assigned] (KAFKA-7119) Intermittent test failure with GSSAPI authentication failure
[ https://issues.apache.org/jira/browse/KAFKA-7119?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Rajini Sivaram reassigned KAFKA-7119: - Assignee: Rajini Sivaram > Intermittent test failure with GSSAPI authentication failure > > > Key: KAFKA-7119 > URL: https://issues.apache.org/jira/browse/KAFKA-7119 > Project: Kafka > Issue Type: Bug > Components: security >Affects Versions: 2.0.0 >Reporter: Rajini Sivaram >Assignee: Rajini Sivaram >Priority: Major > > I have seen this failure a couple of times in builds (e.g. > [https://builds.apache.org/job/kafka-pr-jdk10-scala2.12/2412/testReport/junit/kafka.api/SaslSslAdminClientIntegrationTest/testLogStartOffsetCheckpoint/)] > {quote} > org.apache.kafka.common.errors.SaslAuthenticationException: An error: > (java.security.PrivilegedActionException: javax.security.sasl.SaslException: > GSS initiate failed [Caused by GSSException: No valid credentials provided > (Mechanism level: Request is a replay (34) - Request is a replay)]) occurred > when evaluating SASL token received from the Kafka Broker. Kafka Client will > go to AUTHENTICATION_FAILED state. Caused by: > javax.security.sasl.SaslException: GSS initiate failed [Caused by > GSSException: No valid credentials provided (Mechanism level: Request is a > replay (34) - Request is a replay)] at > jdk.security.jgss/com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:211) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:358) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator$2.run(SaslClientAuthenticator.java:356) > at java.base/java.security.AccessController.doPrivileged(Native Method) at > java.base/javax.security.auth.Subject.doAs(Subject.java:423) at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.createSaslToken(SaslClientAuthenticator.java:356) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.sendSaslClientToken(SaslClientAuthenticator.java:268) > at > org.apache.kafka.common.security.authenticator.SaslClientAuthenticator.authenticate(SaslClientAuthenticator.java:205) > at > org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:127) > at > org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487) > at org.apache.kafka.common.network.Selector.poll(Selector.java:425) at > org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510) at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:271) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:242) > at > org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:218) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:231) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:314) > at > org.apache.kafka.clients.consumer.KafkaConsumer.updateAssignmentMetadataIfNeeded(KafkaConsumer.java:1218) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1181) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1115) > at > kafka.api.AdminClientIntegrationTest.$anonfun$subscribeAndWaitForAssignment$2(AdminClientIntegrationTest.scala:980) > at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:781) at > kafka.api.AdminClientIntegrationTest.subscribeAndWaitForAssignment(AdminClientIntegrationTest.scala:979) > at > kafka.api.AdminClientIntegrationTest.testLogStartOffsetCheckpoint(AdminClientIntegrationTest.scala:755) > at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native > Method) at > java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.base/java.lang.reflect.Method.invoke(Method.java:564) at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at >
[jira] [Comment Edited] (KAFKA-6701) synchronize Log modification between delete cleanup and async delete
[ https://issues.apache.org/jira/browse/KAFKA-6701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575873#comment-16575873 ] M. Manna edited comment on KAFKA-6701 at 8/10/18 7:30 AM: -- [~lindong] I believe this might also be a fix for https://issues.apache.org/jira/browse/KAFKA-6188 . But the issue still occurs on Windows as of 2.11-1.1.0 release. was (Author: manme...@gmail.com): [~lindong] I believe this might also be a fix for https://issues.apache.org/jira/browse/KAFKA-6188 . > synchronize Log modification between delete cleanup and async delete > > > Key: KAFKA-6701 > URL: https://issues.apache.org/jira/browse/KAFKA-6701 > Project: Kafka > Issue Type: Bug >Reporter: Sumant Tambe >Assignee: Sumant Tambe >Priority: Major > > Kafka broker crashes without any evident disk failures > From [~becket_qin]: This looks a bug in kafka when topic deletion and log > retention cleanup happen at the same time, the log retention cleanup may see > ClosedChannelException after the log has been renamed for async deletion. > The root cause is that the topic deletion should have set the isClosed flag > of the partition log to true and the retention should not bother to do the > old log segments deletion when the log is closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-6701) synchronize Log modification between delete cleanup and async delete
[ https://issues.apache.org/jira/browse/KAFKA-6701?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575873#comment-16575873 ] M. Manna commented on KAFKA-6701: - [~lindong] I believe this might also be a fix for https://issues.apache.org/jira/browse/KAFKA-6188 . > synchronize Log modification between delete cleanup and async delete > > > Key: KAFKA-6701 > URL: https://issues.apache.org/jira/browse/KAFKA-6701 > Project: Kafka > Issue Type: Bug >Reporter: Sumant Tambe >Assignee: Sumant Tambe >Priority: Major > > Kafka broker crashes without any evident disk failures > From [~becket_qin]: This looks a bug in kafka when topic deletion and log > retention cleanup happen at the same time, the log retention cleanup may see > ClosedChannelException after the log has been renamed for async deletion. > The root cause is that the topic deletion should have set the isClosed flag > of the partition log to true and the retention should not bother to do the > old log segments deletion when the log is closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Resolved] (KAFKA-6701) synchronize Log modification between delete cleanup and async delete
[ https://issues.apache.org/jira/browse/KAFKA-6701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Dong Lin resolved KAFKA-6701. - Resolution: Fixed The issue appears to have been fixed in https://issues.apache.org/jira/browse/KAFKA-5163. More specifically, https://issues.apache.org/jira/browse/KAFKA-5163 added method `Log.renameDir()` and this method will grab the per-log lock before making modification to the log's directory etc. > synchronize Log modification between delete cleanup and async delete > > > Key: KAFKA-6701 > URL: https://issues.apache.org/jira/browse/KAFKA-6701 > Project: Kafka > Issue Type: Bug >Reporter: Sumant Tambe >Assignee: Sumant Tambe >Priority: Major > > Kafka broker crashes without any evident disk failures > From [~becket_qin]: This looks a bug in kafka when topic deletion and log > retention cleanup happen at the same time, the log retention cleanup may see > ClosedChannelException after the log has been renamed for async deletion. > The root cause is that the topic deletion should have set the isClosed flag > of the partition log to true and the retention should not bother to do the > old log segments deletion when the log is closed. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Commented] (KAFKA-7269) KStream.merge is not documented
[ https://issues.apache.org/jira/browse/KAFKA-7269?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16575845#comment-16575845 ] Virgil Palanciuc commented on KAFKA-7269: - Might I suggest to add "union" as an alternative to "merge"? "union" is the Spark operator, and it's also the mathematical operator that makes most sense, IMO. > KStream.merge is not documented > --- > > Key: KAFKA-7269 > URL: https://issues.apache.org/jira/browse/KAFKA-7269 > Project: Kafka > Issue Type: Improvement > Components: documentation, streams >Affects Versions: 1.0.0, 1.0.1, 1.0.2, 1.1.0, 1.1.1, 2.0.0 >Reporter: John Roesler >Priority: Major > Labels: beginner, newbie > > If I understand the operator correctly, it should be documented as a > stateless transformation at > https://kafka.apache.org/20/documentation/streams/developer-guide/dsl-api.html#stateless-transformations -- This message was sent by Atlassian JIRA (v7.6.3#76005)