[GitHub] [kafka] chia7712 opened a new pull request #10764: MINOR: make sure all fiedls of o.p.k.s.a.Action are NOT null
chia7712 opened a new pull request #10764: URL: https://github.com/apache/kafka/pull/10764 I'm migrating Ranger's kafka plugin from deprecated Authorizer (this is already removed by 976e78e405d57943b989ac487b7f49119b0f4af4) to new API (see https://issues.apache.org/jira/browse/RANGER-3231). The kafka plugin needs to take something from field `resourcePattern` but it does not know whether the field is nullable (or users need to add null check). I check all usages and I don't observe any null case. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on pull request #10552: URL: https://github.com/apache/kafka/pull/10552#issuecomment-848453397 @vahidhashemian , thank you for your review! :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] vahidhashemian commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
vahidhashemian commented on pull request #10552: URL: https://github.com/apache/kafka/pull/10552#issuecomment-848436761 Thanks for addressing my comments @showuon. I tested a couple of unit tests and saw the difference this change makes. I have no further comment at this time. Given this is a big change I'd wait for @ableegoldman's review before approval. In the meantime, I may test some scenarios for additional validation. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
[ https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351457#comment-17351457 ] Luke Chen commented on KAFKA-9295: -- On it! > KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable > -- > > Key: KAFKA-9295 > URL: https://issues.apache.org/jira/browse/KAFKA-9295 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.4.0, 2.6.0 >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > Fix For: 3.0.0 > > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/] > {quote}java.lang.AssertionError: Did not receive all 1 records from topic > output- within 6 ms Expected: is a value equal to or greater than <1> > but: <0> was less than <1> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #10755: MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide
ableegoldman commented on a change in pull request #10755: URL: https://github.com/apache/kafka/pull/10755#discussion_r639358536 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java ## @@ -40,6 +40,18 @@ private final Optional timeCurrentIdlingStarted; +/** + * @deprecated since 3.0, please use {@link #TaskMetadata(TaskId, Set, Map, Map, Optional) instead} + */ +@Deprecated +public TaskMetadata(final String taskId, +final Set topicPartitions, +final Map committedOffsets, +final Map endOffsets, +final Optional timeCurrentIdlingStarted) { +this(TaskId.parse(taskId), topicPartitions, committedOffsets, endOffsets, timeCurrentIdlingStarted); +} + public TaskMetadata(final TaskId taskId, Review comment: https://issues.apache.org/jira/browse/KAFKA-12849 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10745: MINOR: add window verification to sliding-window co-group test
ableegoldman commented on a change in pull request #10745: URL: https://github.com/apache/kafka/pull/10745#discussion_r639358048 ## File path: streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java ## @@ -169,34 +167,37 @@ public void slidingWindowAggregateStreamsTest() { testInputTopic.pipeInput("k2", "B", 504); testInputTopic.pipeInput("k1", "B", 504); -final Set> results = new HashSet<>(); -while (!testOutputTopic.isEmpty()) { -final TestRecord, String> realRecord = testOutputTopic.readRecord(); -final TestRecord nonWindowedRecord = new TestRecord<>( -realRecord.getKey().key(), realRecord.getValue(), null, realRecord.timestamp()); -results.add(nonWindowedRecord); -} -final Set> expected = new HashSet<>(); -expected.add(new TestRecord<>("k1", "0+A", null, 500L)); -expected.add(new TestRecord<>("k2", "0+A", null, 500L)); -expected.add(new TestRecord<>("k2", "0+A", null, 501L)); -expected.add(new TestRecord<>("k2", "0+A+A", null, 501L)); -expected.add(new TestRecord<>("k1", "0+A", null, 502L)); -expected.add(new TestRecord<>("k1", "0+A+A", null, 502L)); -expected.add(new TestRecord<>("k1", "0+A+B", null, 503L)); -expected.add(new TestRecord<>("k1", "0+B", null, 503L)); -expected.add(new TestRecord<>("k1", "0+A+A+B", null, 503L)); -expected.add(new TestRecord<>("k2", "0+A+B", null, 503L)); -expected.add(new TestRecord<>("k2", "0+B", null, 503L)); -expected.add(new TestRecord<>("k2", "0+A+A+B", null, 503L)); -expected.add(new TestRecord<>("k2", "0+A+B+B", null, 504L)); -expected.add(new TestRecord<>("k2", "0+B+B", null, 504L)); -expected.add(new TestRecord<>("k2", "0+B", null, 504L)); -expected.add(new TestRecord<>("k2", "0+A+A+B+B", null, 504L)); -expected.add(new TestRecord<>("k1", "0+A+B+B", null, 504L)); -expected.add(new TestRecord<>("k1", "0+B+B", null, 504L)); -expected.add(new TestRecord<>("k1", "0+B", null, 504L)); -expected.add(new TestRecord<>("k1", "0+A+A+B+B", null, 504L)); +final List, String>> results = testOutputTopic.readRecordsToList(); + +final List, String>> expected = new LinkedList<>(); +// k1-A-500 +expected.add(new TestRecord<>(new Windowed<>("k1", new TimeWindow(0L, 500L)), "0+A", null, 500L)); Review comment: >A {@link TimeWindow} covers a half-open time interval I was about to say we should just make `TimeWindow` un-opinionated, but this is literally the first thing in the javadocs for the class. So I'd say it's pretty clear about what it's representing -- totally missed this before, I thought it was just a basic container class -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12848) Add some basic benchmarks for Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-12848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Sagar Rao reassigned KAFKA-12848: - Assignee: Sagar Rao > Add some basic benchmarks for Kafka Streams > --- > > Key: KAFKA-12848 > URL: https://issues.apache.org/jira/browse/KAFKA-12848 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Major > Labels: newbie, newbie++ > > As the title suggests, we often want to test out improvements or verify that > a bugfix does not introduce a serious regression. While there are existing > benchmarks that are run for quality assurance by various contributors, there > are no publicly available benchmarks for Kafka Streams in AK itself. > It would be great if we had a simple jmh suite (or something) with various > Streams features which could be run on a one-off basis by developers. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351444#comment-17351444 ] Sagar Rao commented on KAFKA-9168: -- Sure thanks. Probably, I will send a patch for one of the APIs (let's say put()) and you can run benchmarks using the internal repo? Meanwhile, I have assigned the other ticket related to benchmarking framework to myself. I can start looking into it as well. BTW, do you think this particular ticket, if the numbers look fine, would warrant a KIP? > Integrate JNI direct buffer support to RocksDBStore > --- > > Key: KAFKA-9168 > URL: https://issues.apache.org/jira/browse/KAFKA-9168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: perfomance > > There has been a PR created on rocksdb Java client to support direct > ByteBuffers in Java. We can look at integrating it whenever it gets merged. > Link to PR: [https://github.com/facebook/rocksdb/pull/2283] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10755: MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide
ableegoldman commented on pull request #10755: URL: https://github.com/apache/kafka/pull/10755#issuecomment-848407312 Addressed your comments @mjsax -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10755: MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide
ableegoldman commented on a change in pull request #10755: URL: https://github.com/apache/kafka/pull/10755#discussion_r639356381 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java ## @@ -80,6 +83,35 @@ public String toString() { return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition; } +/** + * @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId} + */ +public static TaskId parse(final String taskIdStr) { Review comment: Well that would be a breaking change by removing a non-deprecated API, no? And in this case I actually believe we should _not_ deprecate it -- if `toString` is part of the public TaskId API (and it should be) then this `parse` method which does the reverse should be as well. As I discussed with some during the KIP-740 ~debacle~ debate, part of the public contract of TaskId is in its string representation since that is what ends up in logs, metrics, etc. So imo it does make sense to provide a String-to-TaskId API -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10755: MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide
ableegoldman commented on a change in pull request #10755: URL: https://github.com/apache/kafka/pull/10755#discussion_r639355330 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java ## @@ -80,6 +83,35 @@ public String toString() { return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition; } +/** + * @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId} + */ +public static TaskId parse(final String taskIdStr) { +final int firstIndex = taskIdStr.indexOf('_'); Review comment: Yes, we plan to restrict the `_` character. If we want to loosen that up later we can just parse this from the back, but I think it's reasonable to just disallow `_` completely. > What is a topology name? Great question. Not necessarily a short answer but I can try -- basically an independent and isolated piece of a topology that can be added/removed/etc at will, even on a running app. > How to set it? The skeleton API was merged in [#10615](https://github.com/apache/kafka/pull/10615/files), it has/is evolving a bit since then but the basic idea holds -- each NamedTopology is built up with a special builder called the NamedTopologyStreamsBuilder. And a dedicated KafkaStreams wrapper is the entry point for starting up an app using NamedTopologies. All currently under the `internals` package while it's under the experimental phase so it should not be possible for a user to end up with anything NamedTopology through public APIs. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12849) Consider migrating TaskMetadata to interface with internal implementation
[ https://issues.apache.org/jira/browse/KAFKA-12849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351440#comment-17351440 ] A. Sophie Blee-Goldman commented on KAFKA-12849: Though we went through a similar discussion with TaskId and ultimately decided it needed to be an actual class and not an interface, imo those arguments and conditions do not really apply to the TaskMetadata class. So I would propose to move this to an interface, though I didn't want to get into it during KIP-740 which had already dragged on for quite long enough :) > Consider migrating TaskMetadata to interface with internal implementation > - > > Key: KAFKA-12849 > URL: https://issues.apache.org/jira/browse/KAFKA-12849 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Priority: Major > > In KIP-740 we had to go through a deprecation cycle in order to change the > constructor from the original one which accepted the taskId parameter as a > string, to the new one which takes a TaskId object directly. We had > considered just changing the signature directly without deprecation as this > was never intended to be instantiated by users, rather it just acts as a > pass-through metadata class. Sort of by definition if there is no reason to > ever instantiate it, this seems to indicate it may be better suited as a > public interface with the implementation and constructor as internal APIs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12849) Consider migrating TaskMetadata to interface with internal implementation
A. Sophie Blee-Goldman created KAFKA-12849: -- Summary: Consider migrating TaskMetadata to interface with internal implementation Key: KAFKA-12849 URL: https://issues.apache.org/jira/browse/KAFKA-12849 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman In KIP-740 we had to go through a deprecation cycle in order to change the constructor from the original one which accepted the taskId parameter as a string, to the new one which takes a TaskId object directly. We had considered just changing the signature directly without deprecation as this was never intended to be instantiated by users, rather it just acts as a pass-through metadata class. Sort of by definition if there is no reason to ever instantiate it, this seems to indicate it may be better suited as a public interface with the implementation and constructor as internal APIs. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on a change in pull request #10755: MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide
ableegoldman commented on a change in pull request #10755: URL: https://github.com/apache/kafka/pull/10755#discussion_r639351170 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java ## @@ -40,6 +40,18 @@ private final Optional timeCurrentIdlingStarted; +/** + * @deprecated since 3.0, please use {@link #TaskMetadata(TaskId, Set, Map, Map, Optional) instead} + */ +@Deprecated +public TaskMetadata(final String taskId, +final Set topicPartitions, +final Map committedOffsets, +final Map endOffsets, +final Optional timeCurrentIdlingStarted) { +this(TaskId.parse(taskId), topicPartitions, committedOffsets, endOffsets, timeCurrentIdlingStarted); +} + public TaskMetadata(final TaskId taskId, Review comment: Well, we would still need a public constructor no matter what even with an internal impl class. I'll add a comment to clarify that it's not intended for public use and maybe file a followup ticket to migrate this to an interface in case someone wants to get into that -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ableegoldman commented on a change in pull request #10755: MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide
ableegoldman commented on a change in pull request #10755: URL: https://github.com/apache/kafka/pull/10755#discussion_r639350604 ## File path: docs/streams/upgrade-guide.html ## @@ -117,6 +117,14 @@ Streams API We removed the default implementation of RocksDBConfigSetter#close(). + + +The public topicGroupId and partition fields on TaskId have been deprecated and replaced with getters, please migrate to using the new TaskId.subtopology() +and TaskId.partition() APIs instead. Also, the TaskId#readFrom and TaskId#writeTo methods have been deprecated and will be removed, as they were never intended +for public use to begin with. Finally, we have deprecated the TaskMetadata.taskId() method as well as the TaskMetadataconstructor. These have been replaced with APIs that +better represent the task id as an actual TaskId object instead of a String. Please migrate to the new TaskMetadata#getTaskId method and the new constructor which accepts Review comment: True. I'll remove this note about it -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mjsax commented on a change in pull request #10755: MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide
mjsax commented on a change in pull request #10755: URL: https://github.com/apache/kafka/pull/10755#discussion_r639338584 ## File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java ## @@ -40,6 +40,18 @@ private final Optional timeCurrentIdlingStarted; +/** + * @deprecated since 3.0, please use {@link #TaskMetadata(TaskId, Set, Map, Map, Optional) instead} + */ +@Deprecated +public TaskMetadata(final String taskId, +final Set topicPartitions, +final Map committedOffsets, +final Map endOffsets, +final Optional timeCurrentIdlingStarted) { +this(TaskId.parse(taskId), topicPartitions, committedOffsets, endOffsets, timeCurrentIdlingStarted); +} + public TaskMetadata(final TaskId taskId, Review comment: Should we make this protected and add a internal "impl" class? (And also mark as non-public so we can eventually move to an interface?) ## File path: docs/streams/upgrade-guide.html ## @@ -117,6 +117,14 @@ Streams API We removed the default implementation of RocksDBConfigSetter#close(). + + +The public topicGroupId and partition fields on TaskId have been deprecated and replaced with getters, please migrate to using the new TaskId.subtopology() +and TaskId.partition() APIs instead. Also, the TaskId#readFrom and TaskId#writeTo methods have been deprecated and will be removed, as they were never intended +for public use to begin with. Finally, we have deprecated the TaskMetadata.taskId() method as well as the TaskMetadataconstructor. These have been replaced with APIs that Review comment: nit: remove `to begin with` ? ## File path: docs/streams/upgrade-guide.html ## @@ -117,6 +117,14 @@ Streams API We removed the default implementation of RocksDBConfigSetter#close(). + + +The public topicGroupId and partition fields on TaskId have been deprecated and replaced with getters, please migrate to using the new TaskId.subtopology() +and TaskId.partition() APIs instead. Also, the TaskId#readFrom and TaskId#writeTo methods have been deprecated and will be removed, as they were never intended +for public use to begin with. Finally, we have deprecated the TaskMetadata.taskId() method as well as the TaskMetadataconstructor. These have been replaced with APIs that +better represent the task id as an actual TaskId object instead of a String. Please migrate to the new TaskMetadata#getTaskId method and the new constructor which accepts Review comment: `and the new constructor` I thought that the constructor was never intended for public usage. Should we not hide the new constructor? ## File path: docs/streams/upgrade-guide.html ## @@ -117,6 +117,14 @@ Streams API We removed the default implementation of RocksDBConfigSetter#close(). + + +The public topicGroupId and partition fields on TaskId have been deprecated and replaced with getters, please migrate to using the new TaskId.subtopology() +and TaskId.partition() APIs instead. Also, the TaskId#readFrom and TaskId#writeTo methods have been deprecated and will be removed, as they were never intended +for public use to begin with. Finally, we have deprecated the TaskMetadata.taskId() method as well as the TaskMetadataconstructor. These have been replaced with APIs that Review comment: missing blank `TaskMetadataconstructor` ## File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java ## @@ -80,6 +83,35 @@ public String toString() { return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition; } +/** + * @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId} + */ +public static TaskId parse(final String taskIdStr) { Review comment: If it was removed (and release in 2.8), why add it back? Seems we don't need it? ## File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java ## @@ -80,6 +83,35 @@ public String toString() { return namedTopology != null ? namedTopology + "_" + topicGroupId + "_" + partition : topicGroupId + "_" + partition; } +/** + * @throws TaskIdFormatException if the taskIdStr is not a valid {@link TaskId} + */ +public static TaskId parse(final String taskIdStr) { +final int firstIndex = taskIdStr.indexOf('_'); Review comment: I guess I missed the "name topology" change. What is a topology name? How to set it? And do we ensure that we don't allow `_` in its name? ## File path: docs/streams/upgrade-guide.html ## @@ -117,6 +117,14 @@ Streams API
[GitHub] [kafka] DuongPTIT commented on pull request #10677: KAFKA-12380 Executor in Connect's Worker is not shut down when the worker is
DuongPTIT commented on pull request #10677: URL: https://github.com/apache/kafka/pull/10677#issuecomment-848382648 hi @showuon, can you please review this issue for me? Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] DuongPTIT removed a comment on pull request #10677: KAFKA-12380 Executor in Connect's Worker is not shut down when the worker is
DuongPTIT removed a comment on pull request #10677: URL: https://github.com/apache/kafka/pull/10677#issuecomment-845827053 @chia7712 PTAL, thanks! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] junrao commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer
junrao commented on a change in pull request #10280: URL: https://github.com/apache/kafka/pull/10280#discussion_r638367249 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1500,50 +1325,67 @@ class Log(@volatile private var _dir: File, } } -deleteOldSegments(shouldDelete, RetentionSizeBreach) +deleteOldSegments(shouldDelete, RetentionSizeBreach(this)) } private def deleteLogStartOffsetBreachedSegments(): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { nextSegmentOpt.exists(_.baseOffset <= logStartOffset) } -deleteOldSegments(shouldDelete, StartOffsetBreach) +deleteOldSegments(shouldDelete, StartOffsetBreach(this)) } def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix) /** * The size of the log in bytes */ - def size: Long = Log.sizeInBytes(logSegments) + def size: Long = localLog.segments.sizeInBytes /** - * The offset metadata of the next message that will be appended to the log + * The offset of the next message that will be appended to the log */ - def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata + def logEndOffset: Long = localLog.logEndOffset /** - * The offset of the next message that will be appended to the log + * The offset metadata of the next message that will be appended to the log */ - def logEndOffset: Long = nextOffsetMetadata.messageOffset + def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata + + private val rollAction = RollAction( Review comment: I feel RollAction actually makes the code harder to understand than before. So, it would be useful to think through if we could avoid it. In particular, it seems that anything in postRollAction could just be done in the caller if we return enough context. We are taking a producer snapshot in preRollAction. However, since we are not adding new data here. It seems that we could take producer snapshot in Log.roll() after calling localLog.roll() while holding the Log.lock. ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1500,50 +1325,67 @@ class Log(@volatile private var _dir: File, } } -deleteOldSegments(shouldDelete, RetentionSizeBreach) +deleteOldSegments(shouldDelete, RetentionSizeBreach(this)) } private def deleteLogStartOffsetBreachedSegments(): Int = { def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): Boolean = { nextSegmentOpt.exists(_.baseOffset <= logStartOffset) } -deleteOldSegments(shouldDelete, StartOffsetBreach) +deleteOldSegments(shouldDelete, StartOffsetBreach(this)) } def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix) /** * The size of the log in bytes */ - def size: Long = Log.sizeInBytes(logSegments) + def size: Long = localLog.segments.sizeInBytes /** - * The offset metadata of the next message that will be appended to the log + * The offset of the next message that will be appended to the log */ - def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata + def logEndOffset: Long = localLog.logEndOffset /** - * The offset of the next message that will be appended to the log + * The offset metadata of the next message that will be appended to the log */ - def logEndOffset: Long = nextOffsetMetadata.messageOffset + def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata + + private val rollAction = RollAction( +preRollAction = (newSegment: LogSegment) => { + // Take a snapshot of the producer state to facilitate recovery. It is useful to have the snapshot + // offset align with the new segment offset since this ensures we can recover the segment by beginning + // with the corresponding snapshot file and scanning the segment data. Because the segment base offset + // may actually be ahead of the current producer state end offset (which corresponds to the log end offset), + // we manually override the state offset here prior to taking the snapshot. + producerStateManager.updateMapEndOffset(newSegment.baseOffset) + producerStateManager.takeSnapshot() +}, +postRollAction = (newSegment: LogSegment, deletedSegment: Option[LogSegment]) => { + deletedSegment.foreach(segment => deleteProducerSnapshotAsync(Seq(segment))) Review comment: This seems to have exposed an existing bug. During roll, deletedSegment will be non-empty if there is an existing segment of 0 size with the newOffsetToRoll. However, since we take a producer snapshot on newOffsetToRoll before calling postRollAction, we will be deleting the same snapshot we just created. In this case, I think we don't need to delete producerSnapshot for deletedSegment. ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -1572,144
[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351377#comment-17351377 ] A. Sophie Blee-Goldman commented on KAFKA-9168: --- There's nothing available at the moment in AK, I actually just filed https://issues.apache.org/jira/browse/KAFKA-12848 for this. If you'd be interested in putting together a starter kit of jmh benchmarks that would definitely be great – but we do have an internal benchmarking repo so I can just run those on your patch if you'd prefer. We'd like to make some of it public eventually but it requires some footwork and maybe an audit so no one has found the time yet > Integrate JNI direct buffer support to RocksDBStore > --- > > Key: KAFKA-9168 > URL: https://issues.apache.org/jira/browse/KAFKA-9168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: perfomance > > There has been a PR created on rocksdb Java client to support direct > ByteBuffers in Java. We can look at integrating it whenever it gets merged. > Link to PR: [https://github.com/facebook/rocksdb/pull/2283] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12848) Add some basic benchmarks for Kafka Streams
A. Sophie Blee-Goldman created KAFKA-12848: -- Summary: Add some basic benchmarks for Kafka Streams Key: KAFKA-12848 URL: https://issues.apache.org/jira/browse/KAFKA-12848 Project: Kafka Issue Type: Improvement Components: streams Reporter: A. Sophie Blee-Goldman As the title suggests, we often want to test out improvements or verify that a bugfix does not introduce a serious regression. While there are existing benchmarks that are run for quality assurance by various contributors, there are no publicly available benchmarks for Kafka Streams in AK itself. It would be great if we had a simple jmh suite (or something) with various Streams features which could be run on a one-off basis by developers. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ccding opened a new pull request #10763: [WIP] KAFKA-12520: Ensure log loading does not truncate producer state unless required
ccding opened a new pull request #10763: URL: https://github.com/apache/kafka/pull/10763 When we find a .swap file on startup, we typically want to rename and replace it as .log, .index, .timeindex, etc. as a way to complete any ongoing replace operations. These swap files are usually known to have been flushed to disk before the replace operation begins. One flaw in the current logic is that we recover these swap files on startup and as part of that, end up truncating the producer state and rebuild it from scratch. This is unneeded as the replace operation does not mutate the producer state by itself. It is only meant to replace the .log file along with corresponding indices. Because of this unneeded producer state rebuild operation, we have seen multi-hour startup times for clusters that have large compacted topics. This patch fixes the issue. With ext4 ordered mode, the metadata are ordered and no matter it is a clean/unclean shutdown, if at least one `.swap` file exists for a segment, all other files for the segment must exist as `.cleaned` files or `.swap` files. Therefore, we rename the `.cleaned` files to `.swap` files, then make them normal segment files.. If they don't pass the sanity check, we fall back to the original path and repair all the index files. TODO: run validate the patch ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12845) Rollback change which requires join key to be non null on KStream->GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-12845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351308#comment-17351308 ] Matthias J. Sax commented on KAFKA-12845: - I think you confuse "input record key" with "join-key". As you correctly mentioned, we apply a key-extractor to get the join-key for stream side records. This join-key must still be non-null. K10277 only allows that the input record key is null now. Of course, if you use the input record key as join-key, the key-extractor would return the key as-is, and thus, if it's null the record would still be dropped. The issue K10277 addresses is, that if you have a null-key input record, but you key-extractor returns a non-null join-key (from the value of the record), the join should still work. However, before K10277 all records with null-key were dropped, even if the key was not used in the join. Does this make sense? > Rollback change which requires join key to be non null on > KStream->GlobalKTable > --- > > Key: KAFKA-12845 > URL: https://issues.apache.org/jira/browse/KAFKA-12845 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.7.0 >Reporter: Pedro Gontijo >Priority: Major > > As part of [KAFKA-10277|https://issues.apache.org/jira/browse/KAFKA-10277] > the behavior for KStream->GlobalKtable joins was changed to require non null > join keys. > But it seems reasonable that not every record will have an existing > relationship (and hence a key) with the join globalktable. Think about a > User>Car for instance, or PageView>Product. An empty/zero key could be > returned by the KeyMapper but that will make a totally unnecessary search > into the store. > I do not think that makes sense for any GlobalKtable join (inner or left) but > for left join it sounds even more strange. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jlprat commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL
jlprat commented on pull request #10758: URL: https://github.com/apache/kafka/pull/10758#issuecomment-848155893 Sure @ableegoldman , no worries. I just mentioned you as you were the one who created the ticket. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Reopened] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
[ https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] A. Sophie Blee-Goldman reopened KAFKA-9295: --- Guess there is still something else going on here yet. At this point I think we can mostly rule out fiddling with the configs but I don't have any guesses on where to look next. It would be nice if we could get real logs from a run that reproduced this, but unfortunately all the actual Streams content is truncated. [~showuon] maybe you can look into turning the zookeeper and kafka logs down to WARN or even ERROR so that we have some hope of viewing the relevant parts of the logs? I tried to do that a while back but clearly it didn't work, and I didn't have time to revisit it > KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable > -- > > Key: KAFKA-9295 > URL: https://issues.apache.org/jira/browse/KAFKA-9295 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.4.0, 2.6.0 >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > Fix For: 3.0.0 > > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/] > {quote}java.lang.AssertionError: Did not receive all 1 records from topic > output- within 6 ms Expected: is a value equal to or greater than <1> > but: <0> was less than <1> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ableegoldman commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL
ableegoldman commented on pull request #10758: URL: https://github.com/apache/kafka/pull/10758#issuecomment-848103904 To be honest I don't have much context on the javadocs but I will take a look. Maybe @ijuma can help review -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351243#comment-17351243 ] Sagar Rao commented on KAFKA-9168: -- Got it.. How do you want me to benchmark? Using the rocksdb bechmarking utility or is there any other way like jmh within kafka streams? > Integrate JNI direct buffer support to RocksDBStore > --- > > Key: KAFKA-9168 > URL: https://issues.apache.org/jira/browse/KAFKA-9168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: perfomance > > There has been a PR created on rocksdb Java client to support direct > ByteBuffers in Java. We can look at integrating it whenever it gets merged. > Link to PR: [https://github.com/facebook/rocksdb/pull/2283] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351238#comment-17351238 ] A. Sophie Blee-Goldman commented on KAFKA-9168: --- Yep, I think that's exactly what we want to do with this ticket. Looking forward to the results > Integrate JNI direct buffer support to RocksDBStore > --- > > Key: KAFKA-9168 > URL: https://issues.apache.org/jira/browse/KAFKA-9168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: perfomance > > There has been a PR created on rocksdb Java client to support direct > ByteBuffers in Java. We can look at integrating it whenever it gets merged. > Link to PR: [https://github.com/facebook/rocksdb/pull/2283] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-8295) Optimize count() using RocksDB merge operator
[ https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351232#comment-17351232 ] Sagar Rao commented on KAFKA-8295: -- hey [~ableegoldman], wanted to know if you got a chance to look at these numbers I posted above? Plz let me know whenever you get the chance to do so. > Optimize count() using RocksDB merge operator > - > > Key: KAFKA-8295 > URL: https://issues.apache.org/jira/browse/KAFKA-8295 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: A. Sophie Blee-Goldman >Assignee: Sagar Rao >Priority: Major > > In addition to regular put/get/delete RocksDB provides a fourth operation, > merge. This essentially provides an optimized read/update/write path in a > single operation. One of the built-in (C++) merge operators exposed over the > Java API is a counter. We should be able to leverage this for a more > efficient implementation of count() > > (Note: Unfortunately it seems unlikely we can use this to optimize general > aggregations, even if RocksJava allowed for a custom merge operator, unless > we provide a way for the user to specify and connect a C++ implemented > aggregator – otherwise we incur too much cost crossing the jni for a net > performance benefit) -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore
[ https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351230#comment-17351230 ] Sagar Rao commented on KAFKA-9168: -- [~ableegoldman], so I went through the github PR link- which honestly i hadn't done so far :D - and looks like it enables JNI direct byte buffer for all basic operations barring transactions. There is a code snippet explaining put operation using this new option: {code:java} // code placeholder try (RocksDB db = RocksDB.open(opt, "PerformanceTest"); WriteOptions writeOptions = new WriteOptions()) { writeOptions.setDisableWAL(true); ByteBuffer directKeyBuffer = ByteBuffer.allocateDirect(128); directKeyBuffer.order(ByteOrder.BIG_ENDIAN); ByteBuffer directValueBuffer = ByteBuffer.allocateDirect(128); directValueBuffer.order(ByteOrder.BIG_ENDIAN); for (int i = 0; i < 1_000_000; i++) { directKeyBuffer.clear(); directValueBuffer.clear(); for (int o = 0; o < 16; o++) { directKeyBuffer.putLong(i); directValueBuffer.putLong(i); } directKeyBuffer.flip(); directValueBuffer.flip(); db.put(writeOptions, directKeyBuffer, directValueBuffer); } {code} As per the benchmarks released on the link, the iterator performace increased by 37% with 0 GC cycles compared to 293 for the byte array based approach. This is because there is no referenced memory. So, looking at these numbers, and to answer to your question about where this ticket fits in, maybe we can start with some benchmarking the APIs by changing the way rocksdb state store apis are implemented using this new way? What I mean is, today if put() API implementation uses the byte[] based APIs, then we can benchmark using the ByteBuffer based approach and compare the numbers. Do you think that makes sense? > Integrate JNI direct buffer support to RocksDBStore > --- > > Key: KAFKA-9168 > URL: https://issues.apache.org/jira/browse/KAFKA-9168 > Project: Kafka > Issue Type: Improvement > Components: streams >Reporter: Sagar Rao >Assignee: Sagar Rao >Priority: Major > Labels: perfomance > > There has been a PR created on rocksdb Java client to support direct > ByteBuffers in Java. We can look at integrating it whenever it gets merged. > Link to PR: [https://github.com/facebook/rocksdb/pull/2283] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12845) Rollback change which requires join key to be non null on KStream->GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-12845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351227#comment-17351227 ] Pedro Gontijo edited comment on KAFKA-12845 at 5/25/21, 5:26 PM: - [~mjsax] and [~JoelWee] , I would love to hear your thoughts on this. was (Author: pedrong): [~mjsax] and [~JoelWee] would love to hear your thoughts on this. > Rollback change which requires join key to be non null on > KStream->GlobalKTable > --- > > Key: KAFKA-12845 > URL: https://issues.apache.org/jira/browse/KAFKA-12845 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.7.0 >Reporter: Pedro Gontijo >Priority: Major > > As part of [KAFKA-10277|https://issues.apache.org/jira/browse/KAFKA-10277] > the behavior for KStream->GlobalKtable joins was changed to require non null > join keys. > But it seems reasonable that not every record will have an existing > relationship (and hence a key) with the join globalktable. Think about a > User>Car for instance, or PageView>Product. An empty/zero key could be > returned by the KeyMapper but that will make a totally unnecessary search > into the store. > I do not think that makes sense for any GlobalKtable join (inner or left) but > for left join it sounds even more strange. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12845) Rollback change which requires join key to be non null on KStream->GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-12845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351227#comment-17351227 ] Pedro Gontijo edited comment on KAFKA-12845 at 5/25/21, 5:26 PM: - [~mjsax] and [~JoelWee] would love to hear your thoughts on this. was (Author: pedrong): [~mjsax] and [~JoelWee] it would be great o hear your thoughts on this. > Rollback change which requires join key to be non null on > KStream->GlobalKTable > --- > > Key: KAFKA-12845 > URL: https://issues.apache.org/jira/browse/KAFKA-12845 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.7.0 >Reporter: Pedro Gontijo >Priority: Major > > As part of [KAFKA-10277|https://issues.apache.org/jira/browse/KAFKA-10277] > the behavior for KStream->GlobalKtable joins was changed to require non null > join keys. > But it seems reasonable that not every record will have an existing > relationship (and hence a key) with the join globalktable. Think about a > User>Car for instance, or PageView>Product. An empty/zero key could be > returned by the KeyMapper but that will make a totally unnecessary search > into the store. > I do not think that makes sense for any GlobalKtable join (inner or left) but > for left join it sounds even more strange. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12845) Rollback change which requires join key to be non null on KStream->GlobalKTable
[ https://issues.apache.org/jira/browse/KAFKA-12845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351227#comment-17351227 ] Pedro Gontijo commented on KAFKA-12845: --- [~mjsax] and [~JoelWee] it would be great o hear your thoughts on this. > Rollback change which requires join key to be non null on > KStream->GlobalKTable > --- > > Key: KAFKA-12845 > URL: https://issues.apache.org/jira/browse/KAFKA-12845 > Project: Kafka > Issue Type: Improvement > Components: streams >Affects Versions: 2.7.0 >Reporter: Pedro Gontijo >Priority: Major > > As part of [KAFKA-10277|https://issues.apache.org/jira/browse/KAFKA-10277] > the behavior for KStream->GlobalKtable joins was changed to require non null > join keys. > But it seems reasonable that not every record will have an existing > relationship (and hence a key) with the join globalktable. Think about a > User>Car for instance, or PageView>Product. An empty/zero key could be > returned by the KeyMapper but that will make a totally unnecessary search > into the store. > I do not think that makes sense for any GlobalKtable join (inner or left) but > for left join it sounds even more strange. > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] jlprat commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL
jlprat commented on pull request #10758: URL: https://github.com/apache/kafka/pull/10758#issuecomment-847997038 Test failures were known flaky tests. Added a comment on the corresponding Jira's -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
[ https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351166#comment-17351166 ] Josep Prat commented on KAFKA-9295: --- Seen it in [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10758/1/testReport/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/Build___JDK_15_and_Scala_2_13___shouldInnerJoinMultiPartitionQueryable/] > KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable > -- > > Key: KAFKA-9295 > URL: https://issues.apache.org/jira/browse/KAFKA-9295 > Project: Kafka > Issue Type: Bug > Components: streams, unit tests >Affects Versions: 2.4.0, 2.6.0 >Reporter: Matthias J. Sax >Assignee: Luke Chen >Priority: Critical > Labels: flaky-test > Fix For: 3.0.0 > > > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/] > {quote}java.lang.AssertionError: Did not receive all 1 records from topic > output- within 6 ms Expected: is a value equal to or greater than <1> > but: <0> was less than <1> at > org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417) > at > org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511) > at > org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200) > at > org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote} > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Comment Edited] (KAFKA-12511) Flaky test DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
[ https://issues.apache.org/jira/browse/KAFKA-12511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351159#comment-17351159 ] Josep Prat edited comment on KAFKA-12511 at 5/25/21, 4:00 PM: -- Seen in: [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10758/1/testReport/kafka.network/DynamicConnectionQuotaTest/Build___JDK_8_and_Scala_2_12___testDynamicConnectionQuota__/] {code:java} java.net.SocketException: Broken pipe (Write failed) at java.net.SocketOutputStream.socketWrite0(Native Method) at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111) at java.net.SocketOutputStream.write(SocketOutputStream.java:134) at java.io.DataOutputStream.writeInt(DataOutputStream.java:198) at kafka.server.BaseRequestTest.sendRequest(BaseRequestTest.scala:85) at kafka.server.BaseRequestTest.sendWithHeader(BaseRequestTest.scala:139) at kafka.server.BaseRequestTest.send(BaseRequestTest.scala:134) at kafka.server.BaseRequestTest.sendAndReceive(BaseRequestTest.scala:113) at kafka.network.DynamicConnectionQuotaTest.verifyConnection(DynamicConnectionQuotaTest.scala:340) at kafka.network.DynamicConnectionQuotaTest.createAndVerifyConnection(DynamicConnectionQuotaTest.scala:333) at kafka.network.DynamicConnectionQuotaTest.testDynamicConnectionQuota(DynamicConnectionQuotaTest.scala:355){code} was (Author: josep.prat): Seen in: [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10758/1/testReport/kafka.network/DynamicConnectionQuotaTest/Build___JDK_8_and_Scala_2_12___testDynamicConnectionQuota__/] {code:java} java.lang.AssertionError: Did not receive all 1 records from topic output- within 6 ms, currently accumulated data is [] Expected: is a value equal to or greater than <1> but: <0> was less than <1> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:610) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:336) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:606) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:579) at org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:199) at org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:185) {code} > Flaky test > DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota() > -- > > Key: KAFKA-12511 > URL: https://issues.apache.org/jira/browse/KAFKA-12511 > Project: Kafka > Issue Type: Bug >Reporter: dengziming >Priority: Minor > > First time: > Listener PLAINTEXT connection rate 14.419389476913636 must be below > 14.399 ==> expected: but was: > Second time: > Listener EXTERNAL connection rate 10.998243336133811 must be below > 10.799 ==> expected: but was: > details: > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10289/4/testReport/junit/kafka.network/DynamicConnectionQuotaTest/Build___JDK_11___testDynamicListenerConnectionCreationRateQuota__/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12319) Flaky test ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
[ https://issues.apache.org/jira/browse/KAFKA-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351161#comment-17351161 ] Josep Prat commented on KAFKA-12319: Failed again in [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10758/1/testReport/kafka.network/ConnectionQuotasTest/Build___JDK_15_and_Scala_2_13___testListenerConnectionRateLimitWhenActualRateAboveLimit__/] > Flaky test > ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit() > - > > Key: KAFKA-12319 > URL: https://issues.apache.org/jira/browse/KAFKA-12319 > Project: Kafka > Issue Type: Test >Reporter: Justine Olshan >Priority: Major > Labels: flaky-test > > I've seen this test fail a few times locally. But recently I saw it fail on a > PR build on Jenkins. > > [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10041/7/testReport/junit/kafka.network/ConnectionQuotasTest/Build___JDK_11___testListenerConnectionRateLimitWhenActualRateAboveLimit__/] > h3. Error Message > java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: > Expected rate (30 +- 7), but got 37.436825357209706 (600 connections / 16.027 > sec) ==> expected: <30.0> but was: <37.436825357209706> > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12511) Flaky test DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
[ https://issues.apache.org/jira/browse/KAFKA-12511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351159#comment-17351159 ] Josep Prat commented on KAFKA-12511: Seen in: [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10758/1/testReport/kafka.network/DynamicConnectionQuotaTest/Build___JDK_8_and_Scala_2_12___testDynamicConnectionQuota__/] {code:java} java.lang.AssertionError: Did not receive all 1 records from topic output- within 6 ms, currently accumulated data is [] Expected: is a value equal to or greater than <1> but: <0> was less than <1> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:610) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368) at org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:336) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:606) at org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:579) at org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:199) at org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:185) {code} > Flaky test > DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota() > -- > > Key: KAFKA-12511 > URL: https://issues.apache.org/jira/browse/KAFKA-12511 > Project: Kafka > Issue Type: Bug >Reporter: dengziming >Priority: Minor > > First time: > Listener PLAINTEXT connection rate 14.419389476913636 must be below > 14.399 ==> expected: but was: > Second time: > Listener EXTERNAL connection rate 10.998243336133811 must be below > 10.799 ==> expected: but was: > details: > https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10289/4/testReport/junit/kafka.network/DynamicConnectionQuotaTest/Build___JDK_11___testDynamicListenerConnectionCreationRateQuota__/ -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-9009) Flaky Test kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete
[ https://issues.apache.org/jira/browse/KAFKA-9009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351155#comment-17351155 ] Josep Prat commented on KAFKA-9009: --- Seen in [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10758/1/testReport/kafka.integration/MetricsDuringTopicCreationDeletionTest/Build___JDK_11_and_Scala_2_13___testMetricsDuringTopicCreateDelete__/] {code:java} Error Message java.lang.AssertionError: assertion failed: UnderReplicatedPartitionCount not 0: 1 Stacktrace java.lang.AssertionError: assertion failed: UnderReplicatedPartitionCount not 0: 1 at scala.Predef$.assert(Predef.scala:279) at kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete(MetricsDuringTopicCreationDeletionTest.scala:121){code} > Flaky Test > kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete > -- > > Key: KAFKA-9009 > URL: https://issues.apache.org/jira/browse/KAFKA-9009 > Project: Kafka > Issue Type: Test > Components: core >Affects Versions: 2.5.0, 2.6.0 >Reporter: Bill Bejeck >Assignee: Luke Chen >Priority: Major > Labels: flaky-test > > Failure seen in > [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/25644/testReport/junit/kafka.integration/MetricsDuringTopicCreationDeletionTest/testMetricsDuringTopicCreateDelete/] > > {noformat} > Error Messagejava.lang.AssertionError: assertion failed: > UnderReplicatedPartitionCount not 0: 1Stacktracejava.lang.AssertionError: > assertion failed: UnderReplicatedPartitionCount not 0: 1 > at scala.Predef$.assert(Predef.scala:170) > at > kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete(MetricsDuringTopicCreationDeletionTest.scala:123) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) > at java.lang.reflect.Method.invoke(Method.java:498) > at > org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59) > at > org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) > at > org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56) > at > org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at > org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100) > at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103) > at > org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63) > at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330) > at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78) > at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328) > at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65) > at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292) > at > org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) > at > org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) > at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305) > at org.junit.runners.ParentRunner.run(ParentRunner.java:412) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58) > at > org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38) > at > org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62) > at > org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) > at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) > at > sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) > at > sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) >
[GitHub] [kafka] dajac commented on a change in pull request #10757: MINOR: Log more information when producer snapshot is written
dajac commented on a change in pull request #10757: URL: https://github.com/apache/kafka/pull/10757#discussion_r638898378 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -2011,7 +2011,11 @@ object Log extends Logging { logDirFailureChannel, config.messageFormatVersion.recordVersion, s"[Log partition=$topicPartition, dir=${dir.getParent}] ") -val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) +val producerStateManager = new ProducerStateManager( + topicPartition, + dir, + maxProducerIdExpirationMs, + time) Review comment: Reverted to using one line. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12430) emit.heartbeats.enabled = false should disable heartbeats topic creation
[ https://issues.apache.org/jira/browse/KAFKA-12430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351104#comment-17351104 ] Ryanne Dolan commented on KAFKA-12430: -- Hmm. I guess the downside to not creating the topics is that upstreamClusters etc won't work. They don't depend on actual records, just the topics. I don't have any objections but it's a consideration. > emit.heartbeats.enabled = false should disable heartbeats topic creation > > > Key: KAFKA-12430 > URL: https://issues.apache.org/jira/browse/KAFKA-12430 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Ivan Yurchenko >Assignee: Matthew de Detrich >Priority: Minor > > Currently, whether MirrorMaker 2's {{MirrorHeartbeatConnector}} emits > heartbeats or not is based on {{emit.heartbeats.enabled}} setting. However, > {{heartbeats}} topic is created unconditionally. It seems that the same > setting should really disable the topic creation as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ijuma commented on a change in pull request #10757: MINOR: Log more information when producer snapshot is written
ijuma commented on a change in pull request #10757: URL: https://github.com/apache/kafka/pull/10757#discussion_r638837477 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -2011,7 +2011,11 @@ object Log extends Logging { logDirFailureChannel, config.messageFormatVersion.recordVersion, s"[Log partition=$topicPartition, dir=${dir.getParent}] ") -val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) +val producerStateManager = new ProducerStateManager( + topicPartition, + dir, + maxProducerIdExpirationMs, + time) Review comment: I prefer one line in this case, but I'm fine either way. :) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12430) emit.heartbeats.enabled = false should disable heartbeats topic creation
[ https://issues.apache.org/jira/browse/KAFKA-12430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Matthew de Detrich reassigned KAFKA-12430: -- Assignee: Matthew de Detrich > emit.heartbeats.enabled = false should disable heartbeats topic creation > > > Key: KAFKA-12430 > URL: https://issues.apache.org/jira/browse/KAFKA-12430 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Ivan Yurchenko >Assignee: Matthew de Detrich >Priority: Minor > > Currently, whether MirrorMaker 2's {{MirrorHeartbeatConnector}} emits > heartbeats or not is based on {{emit.heartbeats.enabled}} setting. However, > {{heartbeats}} topic is created unconditionally. It seems that the same > setting should really disable the topic creation as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12430) emit.heartbeats.enabled = false should disable heartbeats topic creation
[ https://issues.apache.org/jira/browse/KAFKA-12430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351093#comment-17351093 ] Matthew de Detrich commented on KAFKA-12430: [~ryannedolan] I am going to look into this, do you have any comments/context to add on this topic (i.e. is there some deliberate reason why if emit.heartbeats.enabled is false then the heartbeat topics are still created?) > emit.heartbeats.enabled = false should disable heartbeats topic creation > > > Key: KAFKA-12430 > URL: https://issues.apache.org/jira/browse/KAFKA-12430 > Project: Kafka > Issue Type: Bug > Components: mirrormaker >Reporter: Ivan Yurchenko >Priority: Minor > > Currently, whether MirrorMaker 2's {{MirrorHeartbeatConnector}} emits > heartbeats or not is based on {{emit.heartbeats.enabled}} setting. However, > {{heartbeats}} topic is created unconditionally. It seems that the same > setting should really disable the topic creation as well. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] ryannedolan commented on pull request #10762: KAFKA-12819: Add assert messages to MirrorMaker tests plus other qual…
ryannedolan commented on pull request #10762: URL: https://github.com/apache/kafka/pull/10762#issuecomment-847906780 thx lgtm! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] mdedetrich opened a new pull request #10762: KAFKA-12819: Add assert messages to MirrorMaker tests plus other qual…
mdedetrich opened a new pull request #10762: URL: https://github.com/apache/kafka/pull/10762 This PR does various QoL improvements for the MM tests, mainly some basic refactoring to remove some boilerplate as well as adding messages to all of the assert statements so that if they fail there is more context Some improvements to the assert failure messages may need to be done to make it more clear whats actually going on, @ryannedolan you may have more to add on this point. - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #10757: MINOR: Log more information when producer snapshot is written
dajac commented on a change in pull request #10757: URL: https://github.com/apache/kafka/pull/10757#discussion_r638806738 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -2011,7 +2011,11 @@ object Log extends Logging { logDirFailureChannel, config.messageFormatVersion.recordVersion, s"[Log partition=$topicPartition, dir=${dir.getParent}] ") -val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) +val producerStateManager = new ProducerStateManager( + topicPartition, + dir, + maxProducerIdExpirationMs, + time) Review comment: Both are equivalent for me. I found that the line was getting too long with the addition of `time` so I broke it down that way to stay inline with `LogLoader.load` below. Would you prefer to keep it on one line instead? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup
dongjinleekr commented on pull request #9414: URL: https://github.com/apache/kafka/pull/9414#issuecomment-847886290 Hi @guozhangwang @vvcephei, Could you have a look now? :smiley: -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dongjinleekr commented on pull request #10428: KAFKA-12572: Add import ordering checkstyle rule and configure an automatic formatter
dongjinleekr commented on pull request #10428: URL: https://github.com/apache/kafka/pull/10428#issuecomment-847885818 Rebased onto the latest trunk. cc/ @cadonna -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on a change in pull request #10757: MINOR: Log more information when producer snapshot is written
ijuma commented on a change in pull request #10757: URL: https://github.com/apache/kafka/pull/10757#discussion_r638803061 ## File path: core/src/main/scala/kafka/log/Log.scala ## @@ -2011,7 +2011,11 @@ object Log extends Logging { logDirFailureChannel, config.messageFormatVersion.recordVersion, s"[Log partition=$topicPartition, dir=${dir.getParent}] ") -val producerStateManager = new ProducerStateManager(topicPartition, dir, maxProducerIdExpirationMs) +val producerStateManager = new ProducerStateManager( + topicPartition, + dir, + maxProducerIdExpirationMs, + time) Review comment: Nit: is this really easier to read than just adding the `time` parameter in the same line? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
dajac commented on pull request #10760: URL: https://github.com/apache/kafka/pull/10760#issuecomment-847883470 I will review it in the next few days. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma commented on pull request #10761: MINOR: Don't ignore deletion of partition metadata file and log topic id clean-ups
ijuma commented on pull request #10761: URL: https://github.com/apache/kafka/pull/10761#issuecomment-847881296 cc @jolshan -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #10761: MINOR: Don't ignore deletion of partition metadata file and log topic id clean-ups
ijuma opened a new pull request #10761: URL: https://github.com/apache/kafka/pull/10761 Log if deletion fails and don't expose log topic id for mutability outside of `assignTopicId()`. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] thomaskwscott opened a new pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp
thomaskwscott opened a new pull request #10760: URL: https://github.com/apache/kafka/pull/10760 See https://cwiki.apache.org/confluence/display/KAFKA/KIP-734%3A+Improve+AdminClient.listOffsets+to+return+timestamp+and+offset+for+the+record+with+the+largest+timestamp *More detailed description of your change, if necessary. The PR title and PR message become the squashed commit message, so use a separate comment to ping reviewers.* Tested with new Integration test *Summary of testing strategy (including rationale) for the feature or bug fix. Unit and/or integration tests are expected for any behaviour change and system tests should be considered for larger changes.* ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] ijuma opened a new pull request #10759: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`
ijuma opened a new pull request #10759: URL: https://github.com/apache/kafka/pull/10759 New parameters in overloaded methods should appear later apart from lambdas that should always be last. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Resolved] (KAFKA-12800) Configure jackson to to reject trailing input in the generator
[ https://issues.apache.org/jira/browse/KAFKA-12800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot resolved KAFKA-12800. - Fix Version/s: 3.0.0 Resolution: Fixed Author: Nathan Lincoln > Configure jackson to to reject trailing input in the generator > -- > > Key: KAFKA-12800 > URL: https://issues.apache.org/jira/browse/KAFKA-12800 > Project: Kafka > Issue Type: Task > Components: generator >Reporter: Nathan Lincoln >Priority: Minor > Fix For: 3.0.0 > > > The ObjectMapper instance that parses the schema JSONs will successfully > parse, even if there is trailing input at the end of the file. This the > default behavior on Jackson, but JSON parsers in other languages may reject > these files. > The only instance of this should have been fixed with KAFKA-12794, and > configuring jackson to reject this in the future is simple - just enable > [FAIL_ON_TRAILING_TOKENS|https://fasterxml.github.io/jackson-databind/javadoc/2.9/com/fasterxml/jackson/databind/DeserializationFeature.html#FAIL_ON_TRAILING_TOKENS] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12800) Configure jackson to to reject trailing input in the generator
[ https://issues.apache.org/jira/browse/KAFKA-12800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot reassigned KAFKA-12800: --- Assignee: (was: David Jacot) > Configure jackson to to reject trailing input in the generator > -- > > Key: KAFKA-12800 > URL: https://issues.apache.org/jira/browse/KAFKA-12800 > Project: Kafka > Issue Type: Task > Components: generator >Reporter: Nathan Lincoln >Priority: Minor > > The ObjectMapper instance that parses the schema JSONs will successfully > parse, even if there is trailing input at the end of the file. This the > default behavior on Jackson, but JSON parsers in other languages may reject > these files. > The only instance of this should have been fixed with KAFKA-12794, and > configuring jackson to reject this in the future is simple - just enable > [FAIL_ON_TRAILING_TOKENS|https://fasterxml.github.io/jackson-databind/javadoc/2.9/com/fasterxml/jackson/databind/DeserializationFeature.html#FAIL_ON_TRAILING_TOKENS] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12800) Configure jackson to to reject trailing input in the generator
[ https://issues.apache.org/jira/browse/KAFKA-12800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] David Jacot reassigned KAFKA-12800: --- Assignee: David Jacot > Configure jackson to to reject trailing input in the generator > -- > > Key: KAFKA-12800 > URL: https://issues.apache.org/jira/browse/KAFKA-12800 > Project: Kafka > Issue Type: Task > Components: generator >Reporter: Nathan Lincoln >Assignee: David Jacot >Priority: Minor > > The ObjectMapper instance that parses the schema JSONs will successfully > parse, even if there is trailing input at the end of the file. This the > default behavior on Jackson, but JSON parsers in other languages may reject > these files. > The only instance of this should have been fixed with KAFKA-12794, and > configuring jackson to reject this in the future is simple - just enable > [FAIL_ON_TRAILING_TOKENS|https://fasterxml.github.io/jackson-databind/javadoc/2.9/com/fasterxml/jackson/databind/DeserializationFeature.html#FAIL_ON_TRAILING_TOKENS] > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] dajac merged pull request #10717: KAFKA-12800: Configure generator to fail on trailing JSON tokens
dajac merged pull request #10717: URL: https://github.com/apache/kafka/pull/10717 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac commented on a change in pull request #10616: KAFKA-12709; Add Admin API for `ListTransactions`
dajac commented on a change in pull request #10616: URL: https://github.com/apache/kafka/pull/10616#discussion_r638764893 ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategy.java ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.clients.admin.internals; + +import org.apache.kafka.common.internals.KafkaFutureImpl; +import org.apache.kafka.common.message.MetadataRequestData; +import org.apache.kafka.common.message.MetadataResponseData; +import org.apache.kafka.common.requests.AbstractResponse; +import org.apache.kafka.common.requests.MetadataRequest; +import org.apache.kafka.common.requests.MetadataResponse; +import org.apache.kafka.common.utils.LogContext; +import org.slf4j.Logger; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.Objects; +import java.util.OptionalInt; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class is used for use cases which require requests to be sent to all + * brokers in the cluster. + * + * This is a slightly degenerate case of a lookup strategy in the sense that + * the broker IDs are used as both the keys and values. Also, unlike + * {@link CoordinatorStrategy} and {@link PartitionLeaderStrategy}, we do not + * know the set of keys ahead of time: we require the initial lookup in order + * to discover what the broker IDs are. This is represented with a more complex + * type {@code Future>} in the admin API result type. + * For example, see {@link org.apache.kafka.clients.admin.ListTransactionsResult}. + */ +public class AllBrokersStrategy implements AdminApiLookupStrategy { +public static final BrokerKey ANY_BROKER = new BrokerKey(OptionalInt.empty()); +public static final Set LOOKUP_KEYS = Collections.singleton(ANY_BROKER); +private static final ApiRequestScope SINGLE_REQUEST_SCOPE = new ApiRequestScope() { +}; + +private final Logger log; + +public AllBrokersStrategy( +LogContext logContext +) { +this.log = logContext.logger(AllBrokersStrategy.class); +} + +@Override +public ApiRequestScope lookupScope(BrokerKey key) { +return SINGLE_REQUEST_SCOPE; +} + +@Override +public MetadataRequest.Builder buildRequest(Set keys) { +validateLookupKeys(keys); +// Send empty `Metadata` request. We are only interested in the brokers from the response +return new MetadataRequest.Builder(new MetadataRequestData()); +} + +@Override +public LookupResult handleResponse(Set keys, AbstractResponse abstractResponse) { +validateLookupKeys(keys); + +MetadataResponse response = (MetadataResponse) abstractResponse; +MetadataResponseData.MetadataResponseBrokerCollection brokers = response.data().brokers(); + +if (brokers.isEmpty()) { +log.debug("Metadata response contained no brokers. Will backoff and retry"); +return LookupResult.empty(); +} else { +log.debug("Discovered all brokers {} to send requests to", brokers); +} + +Map brokerKeys = brokers.stream().collect(Collectors.toMap( +broker -> new BrokerKey(OptionalInt.of(broker.nodeId())), +MetadataResponseData.MetadataResponseBroker::nodeId +)); + +return new LookupResult<>( +Collections.singletonList(ANY_BROKER), +Collections.emptyMap(), +brokerKeys +); +} + +private void validateLookupKeys(Set keys) { +if (keys.size() != 1) { +throw new IllegalArgumentException("Unexpected key set " + keys); Review comment: nit: I would add a `:` after `set` to stay consistent with the other error messages in this PR. ## File path: clients/src/main/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategy.java ## @@ -0,0 +1,212 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this
[GitHub] [kafka] jlprat commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL
jlprat commented on pull request #10758: URL: https://github.com/apache/kafka/pull/10758#issuecomment-847839591 cc @ableegoldman as you opened the Jira ticket, maybe you'd like to review this one? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] jlprat opened a new pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL
jlprat opened a new pull request #10758: URL: https://github.com/apache/kafka/pull/10758 This is just a workaround to solve this problem while we are still using JDK11. Once moving to, presumably, JDK17 this change won't be needed anymore and could be deleted safely. See https://bugs.openjdk.java.net/browse/JDK-8215291 This change includes a snippet of code copied from JDK 12+ I'm not sure if an extra header needs to be added for the piece of code I copied over, or if I would need to implement it from scratch. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on pull request #10552: URL: https://github.com/apache/kafka/pull/10552#issuecomment-847807953 @vahidhashemian , thanks for the comments. I've updated. Please take a look again. Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r638718719 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -637,19 +709,27 @@ private void assignPartition(TopicPartition partition, } } -private boolean canParticipateInReassignment(TopicPartition partition, - Map> partition2AllPotentialConsumers) { +private boolean canParticipateInReassignment(String topic, + Map> topic2AllPotentialConsumers) { // if a partition has two or more potential consumers it is subject to reassignment. -return partition2AllPotentialConsumers.get(partition).size() >= 2; +return topic2AllPotentialConsumers.get(topic).size() >= 2; } private boolean canParticipateInReassignment(String consumer, Map> currentAssignment, - Map> consumer2AllPotentialPartitions, - Map> partition2AllPotentialConsumers) { + Map> consumer2AllPotentialTopics, + Map> topic2AllPotentialConsumers, + Map partitionsPerTopic, + int totalPartitionCount) { List currentPartitions = currentAssignment.get(consumer); int currentAssignmentSize = currentPartitions.size(); -int maxAssignmentSize = consumer2AllPotentialPartitions.get(consumer).size(); +List allSubscribedTopics = consumer2AllPotentialTopics.get(consumer); +int maxAssignmentSize; +if (allSubscribedTopics.size() == partitionsPerTopic.size()) { +maxAssignmentSize = totalPartitionCount; +} else { +maxAssignmentSize = allSubscribedTopics.stream().map(topic -> partitionsPerTopic.get(topic)).reduce(0, Integer::sum); +} Review comment: Good suggestion! Updated. Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dajac opened a new pull request #10757: MINOR: Log more information when producer snapshot is written
dajac opened a new pull request #10757: URL: https://github.com/apache/kafka/pull/10757 This patch logs more information when a producer snapshot is written to the disk. ### Committer Checklist (excluded from commit message) - [ ] Verify design and implementation - [ ] Verify test coverage and CI build status - [ ] Verify documentation (including upgrade notes) -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12782) Javadocs search sends you to a non-existent URL
[ https://issues.apache.org/jira/browse/KAFKA-12782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351001#comment-17351001 ] Josep Prat commented on KAFKA-12782: I can confirm this is bug with Java11 for API docs that do not use the modules system. It has been solved on JDK 12 and higher but not yet backported to JDK 11. See [https://bugs.openjdk.java.net/browse/JDK-8215291] for more details. Long story short, the search.js file appends the module name in the URL. I ran the docs script manually specifying JDK 16 and the links on the search are generated correctly pointing to the right URL. I will try to find a workaround that would make it work in the meantime. It is possible to disable the module system option for the API docs, however all links pointing to JDK APIs (i.e. String) won't work as they would need the submodule prefix. > Javadocs search sends you to a non-existent URL > --- > > Key: KAFKA-12782 > URL: https://issues.apache.org/jira/browse/KAFKA-12782 > Project: Kafka > Issue Type: Bug > Components: docs >Reporter: A. Sophie Blee-Goldman >Assignee: Josep Prat >Priority: Major > > I was looking up a class using the javadocs search functionality, and clicked > on the link when TaskId came up, but it sent me to which > [https://kafka.apache.org/28/javadoc/undefined/org/apache/kafka/streams/processor/TaskId.html] > does not exist. > I noticed the URL had an odd "undefined" term inserted before the package > name, so I took that out and was able to find the [correct > javadocs|https://kafka.apache.org/28/javadoc/org/apache/kafka/streams/processor/TaskId.html]. > So the search seems to be broken due to this "undefined" term that's being > injected somewhere, for some reason. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12743) [Kafka Streams] - cluster failover for stateful Kafka Streams applications
[ https://issues.apache.org/jira/browse/KAFKA-12743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351000#comment-17351000 ] Sergey Zyrianov commented on KAFKA-12743: - I don't think uReplicator alternative is relevant. It does not provide offset tracking b/w clusters. Even if topic name is unchanged - state store has to figure out offset in another cluster. mm2 changes topic names for a reason. I don't know how capable Confluent's replicator is. > [Kafka Streams] - cluster failover for stateful Kafka Streams applications > -- > > Key: KAFKA-12743 > URL: https://issues.apache.org/jira/browse/KAFKA-12743 > Project: Kafka > Issue Type: Improvement > Components: mirrormaker, streams >Affects Versions: 2.8.0 >Reporter: Sergey Zyrianov >Priority: Major > > Currently, when working with Kafka backed state stores in Kafka Streams, > these log compacted topics are given a hardcoded name : > _app_id-storename-changelog_ > {noformat} > public static String storeChangelogTopic(String applicationId, String > storeName) { > return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX; > }{noformat} > > MirrorMaker2(mm2) copies these topics to remote cluster under the name > _src-cluster-alias.app_id-storename-changelog_ > > When streams app fails over to the remote cluster it has troubles to find > changelog topic of its state store since it was renamed - given source > cluster prefix by mm2. > Whats the fix should be ? instruct mm2 to keep topic name or subscribe to > regex *._app_id-storename-changelog_ topic name for the state's changelog. > > -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-12782) Javadocs search sends you to a non-existent URL
[ https://issues.apache.org/jira/browse/KAFKA-12782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Josep Prat reassigned KAFKA-12782: -- Assignee: Josep Prat > Javadocs search sends you to a non-existent URL > --- > > Key: KAFKA-12782 > URL: https://issues.apache.org/jira/browse/KAFKA-12782 > Project: Kafka > Issue Type: Bug > Components: docs >Reporter: A. Sophie Blee-Goldman >Assignee: Josep Prat >Priority: Major > > I was looking up a class using the javadocs search functionality, and clicked > on the link when TaskId came up, but it sent me to which > [https://kafka.apache.org/28/javadoc/undefined/org/apache/kafka/streams/processor/TaskId.html] > does not exist. > I noticed the URL had an odd "undefined" term inserted before the package > name, so I took that out and was able to find the [correct > javadocs|https://kafka.apache.org/28/javadoc/org/apache/kafka/streams/processor/TaskId.html]. > So the search seems to be broken due to this "undefined" term that's being > injected somewhere, for some reason. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Updated] (KAFKA-12847) Dockerfile needed for kafka system tests needs changes
[ https://issues.apache.org/jira/browse/KAFKA-12847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Abhijit Mane updated KAFKA-12847: - Description: Hello, I tried apache/kafka system tests as per documentation: - ([https://github.com/apache/kafka/tree/trunk/tests#readme|https://github.com/apache/kafka/tree/trunk/tests#readme_]) = PROBLEM ~~ 1.) As root user, clone kafka github repo and start "kafka system tests" # git clone [https://github.com/apache/kafka.git] # cd kafka # ./gradlew clean systemTestLibs # bash tests/docker/run_tests.sh 2.) Dockerfile issue - [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile] This file has an *UID* entry as shown below: - --- ARG *UID*="1000" RUN useradd -u $*UID* ducker // {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not unique, root user id is 0 --- I ran everything as root which means the built-in bash environment variable 'UID' always resolves to 0 and can't be changed. Hence, the docker build fails. The issue should be seen even if run as non-root. 3.) Next, as root, as per README, I ran: - server:/kafka> *bash tests/docker/run_tests.sh* The ducker tool builds the container images & switches to user '*ducker*' inside the container & maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container. Ref: [https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak|https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak] Ex: docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* This fails as the 'ducker' user has *no write permissions* to create files under 'kafka' root dir. Hence, it needs to be made writeable. // *chmod -R a+w kafka* – needed as container is run as 'ducker' and needs write access since kafka root volume from host is mapped to container as "/opt/kafka-dev" where the 'ducker' user writes logs = = *FIXES needed* ~ 1.) Dockerfile - [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile] Change 'UID' to '*UID_DUCKER*'. This won't conflict with built in bash env. var UID and the docker image build should succeed. --- ARG *UID_DUCKER*="1000" RUN useradd -u $*UID_DUCKER* ducker // *{color:#57d9a3}No Error{color}* => No conflict with built-in UID --- 2.) README needs an update where we must ensure the kafka root dir from where the tests are launched is writeable to allow the 'ducker' user to create results/logs. # chmod -R a+w kafka With this, I was able to get the docker images built and system tests started successfully. = Also, I wonder whether or not upstream Dockerfile & System tests are part of CI/CD and get tested for every PR. If so, this issue should have been caught. *Question to kafka SME* - Do you believe this is a valid problem with the Dockerfile and the fix is acceptable? Please let me know and I am happy to submit a PR with this fix. Thanks, Abhijit was: Hello, I tried apache/kafka system tests as per documentation: - ([https://github.com/apache/kafka/tree/trunk/tests#readme|https://github.com/apache/kafka/tree/trunk/tests#readme_]) = PROBLEM ~~ 1.) As root user, clone kafka github repo and start "kafka system tests" # git clone [https://github.com/apache/kafka.git] # cd kafka # ./gradlew clean systemTestLibs # bash tests/docker/run_tests.sh 2.) Dockerfile issue - [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile] This file has an *UID* entry as shown below: - --- ARG *UID*="1000" RUN useradd -u $*UID* ducker // {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not unique, root user id is 0 --- I ran everything as root which means the built-in bash environment variable 'UID' always resolves to 0 and can't be changed. Hence, the docker build fails. 3.) Next, as root, as per README, I ran: - server:/kafka> *bash tests/docker/run_tests.sh* The ducker tool builds the container images & switches to user '*ducker*' inside the container & maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container. Ref: [https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak|https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak] Ex: docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* This fails as the 'ducker' user has *no write permissions* to create files under 'kafka' root dir. Hence, it needs to be made writeable. // *chmod -R a+w kafka* – needed as container is run as 'ducker' and needs write access since kafka root volume from host is mapped to container as
[jira] [Updated] (KAFKA-12847) Dockerfile needed for kafka system tests needs changes
[ https://issues.apache.org/jira/browse/KAFKA-12847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Abhijit Mane updated KAFKA-12847: - Description: Hello, I tried apache/kafka system tests as per documentation: - ([https://github.com/apache/kafka/tree/trunk/tests#readme|https://github.com/apache/kafka/tree/trunk/tests#readme_]) = PROBLEM ~~ 1.) As root user, clone kafka github repo and start "kafka system tests" # git clone [https://github.com/apache/kafka.git] # cd kafka # ./gradlew clean systemTestLibs # bash tests/docker/run_tests.sh 2.) Dockerfile issue - [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile] This file has an *UID* entry as shown below: - --- ARG *UID*="1000" RUN useradd -u $*UID* ducker // {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not unique, root user id is 0 --- I ran everything as root which means the built-in bash environment variable 'UID' always resolves to 0 and can't be changed. Hence, the docker build fails. 3.) Next, as root, as per README, I ran: - server:/kafka> *bash tests/docker/run_tests.sh* The ducker tool builds the container images & switches to user '*ducker*' inside the container & maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container. Ref: [https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak|https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak] Ex: docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* This fails as the 'ducker' user has *no write permissions* to create files under 'kafka' root dir. Hence, it needs to be made writeable. // *chmod -R a+w kafka* – needed as container is run as 'ducker' and needs write access since kafka root volume from host is mapped to container as "/opt/kafka-dev" where the 'ducker' user writes logs = = *FIXES needed* ~ 1.) Dockerfile - [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile] Change 'UID' to '*UID_DUCKER*'. This won't conflict with built in bash env. var UID and the docker image build should succeed. --- ARG *UID_DUCKER*="1000" RUN useradd -u $*UID_DUCKER* ducker // *{color:#57d9a3}No Error{color}* => No conflict with built-in UID --- 2.) README needs an update where we must ensure the kafka root dir from where the tests are launched is writeable to allow the 'ducker' user to create results/logs. # chmod -R a+w kafka With this, I was able to get the docker images built and system tests started successfully. = Also, I wonder whether or not upstream Dockerfile & System tests are part of CI/CD and get tested for every PR. If so, this issue should have been caught. *Question to kafka SME* - Do you believe this is a valid problem with the Dockerfile and the fix is acceptable? Please let me know and I am happy to submit a PR with this fix. Thanks, Abhijit was: Hello, I tried apache/kafka system tests as per documentation: - ([https://github.com/apache/kafka/tree/trunk/tests#readme|https://github.com/apache/kafka/tree/trunk/tests#readme_]) = PROBLEM ~~ 1.) As root user, clone kafka github repo and start "kafka system tests" # git clone [https://github.com/apache/kafka.git] # cd kafka # ./gradlew clean systemTestLibs # bash tests/docker/run_tests.sh 2.) Dockerfile issue - [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile] This file has an *UID* entry as shown below: - --- ARG *UID*="1000" RUN useradd -u $*UID* ducker // {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not unique, root user id is 0 --- I ran everything as root which means the built-in bash environment variable 'UID' always resolves to 0 and can't be changed. Hence, the docker build fails. 3.) Next, as root, as per README, I ran: - server:/kafka> *bash tests/docker/run_tests.sh* The ducker tool builds the container images & switches to user '*ducker*' inside the container & maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container. Ref: [https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak|https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak*] Ex: docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* This fails as the 'ducker' user has *no write permissions* to create files under 'kafka' root dir. Hence, it needs to be made writeable. // *chmod -R a+w kafka* – needed as container is run as 'ducker' and needs write access since kafka root volume from host is mapped to container as "/opt/kafka-dev" where the 'ducker' user writes logs
[jira] [Updated] (KAFKA-12847) Dockerfile needed for kafka system tests needs changes
[ https://issues.apache.org/jira/browse/KAFKA-12847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Abhijit Mane updated KAFKA-12847: - Description: Hello, I tried apache/kafka system tests as per documentation: - ([https://github.com/apache/kafka/tree/trunk/tests#readme|https://github.com/apache/kafka/tree/trunk/tests#readme_]) = PROBLEM ~~ 1.) As root user, clone kafka github repo and start "kafka system tests" # git clone [https://github.com/apache/kafka.git] # cd kafka # ./gradlew clean systemTestLibs # bash tests/docker/run_tests.sh 2.) Dockerfile issue - [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile] This file has an *UID* entry as shown below: - --- ARG *UID*="1000" RUN useradd -u $*UID* ducker // {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not unique, root user id is 0 --- I ran everything as root which means the built-in bash environment variable 'UID' always resolves to 0 and can't be changed. Hence, the docker build fails. 3.) Next, as root, as per README, I ran: - server:/kafka> *bash tests/docker/run_tests.sh* The ducker tool builds the container images & switches to user '*ducker*' inside the container & maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container. Ref: [https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak|https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak*] Ex: docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* This fails as the 'ducker' user has *no write permissions* to create files under 'kafka' root dir. Hence, it needs to be made writeable. // *chmod -R a+w kafka* – needed as container is run as 'ducker' and needs write access since kafka root volume from host is mapped to container as "/opt/kafka-dev" where the 'ducker' user writes logs = = *FIXES needed* ~ 1.) Dockerfile - [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile] Change 'UID' to '*UID_DUCKER*'. This won't conflict with built in bash env. var UID and the docker image build should succeed. --- ARG *UID_DUCKER*="1000" RUN useradd -u $*UID_DUCKER* ducker // *{color:#57d9a3}No Error{color}* => No conflict with built-in UID --- 2.) README needs an update where we must ensure the kafka root dir from where the tests are launched is writeable to allow the 'ducker' user to create results/logs. # chmod -R a+w kafka With this, I was able to get the docker images built and system tests started successfully. = Also, I wonder whether or not upstream Dockerfile & System tests are part of CI/CD and get tested for every PR. If so, this issue should have been caught. *Question to kafka SME* - Do you believe this is a valid problem with the Dockerfile and the fix is acceptable? Please let me know and I am happy to submit a PR with this fix. Thanks, Abhijit was: Hello, I tried apache/kafka system tests as per documentation: - ([https://github.com/apache/kafka/tree/trunk/tests#readme|https://github.com/apache/kafka/tree/trunk/tests#readme_]) = PROBLEM ~~ 1.) As root user, clone kafka github repo and start "kafka system tests" # git clone [https://github.com/apache/kafka.git] # cd kafka # ./gradlew clean systemTestLibs # bash tests/docker/run_tests.sh 2.) Dockerfile issue - [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile|https://github.com/apache/kafka/blob/trunk/tests/docker/*Dockerfile*_] This file has an *UID* entry as shown below: - --- ARG *UID*="1000" RUN useradd -u $*UID* ducker // {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not unique, root user id is 0 --- I ran everything as root which means the built-in bash environment variable 'UID' always resolves to 0 and can't be changed. Hence, the docker build fails. 3.) Next, as root, as per README, I ran: - server:/kafka> *bash tests/docker/run_tests.sh* The ducker tool builds the container images & switches to user '*ducker*' inside the container & maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container. Ref: [https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak|https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak*] Ex: docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* This fails as the 'ducker' user has *no write permissions* to create files under 'kafka' root dir. Hence, it needs to be made writeable. // *chmod -R a+w kafka* – needed as container is run as 'ducker' and needs write access since kafka root volume from host is mapped to
[jira] [Updated] (KAFKA-12847) Dockerfile needed for kafka system tests needs changes
[ https://issues.apache.org/jira/browse/KAFKA-12847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Abhijit Mane updated KAFKA-12847: - Description: Hello, I tried apache/kafka system tests as per documentation: - (_[https://github.com/apache/kafka/tree/trunk/tests#readme_]) = PROBLEM ~~ 1.) As root user, clone kafka github repo and start "kafka system tests" # git clone [https://github.com/apache/kafka.git] # cd kafka # ./gradlew clean systemTestLibs # bash tests/docker/run_tests.sh 2.) Dockerfile issue - [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile|https://github.com/apache/kafka/blob/trunk/tests/docker/*Dockerfile*_] This file has an *UID* entry as shown below: - --- ARG *UID*="1000" RUN useradd -u $*UID* ducker // {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not unique, root user id is 0 --- I ran everything as root which means the built-in bash environment variable 'UID' always resolves to 0 and can't be changed. Hence, the docker build fails. 3.) Next, as root, as per README, I ran: - server:/kafka> *bash tests/docker/run_tests.sh* The ducker tool builds the container images & switches to user '*ducker*' inside the container & maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container. Ref: *[https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak*] Ex: docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* This fails as the 'ducker' user has *no write permissions* to create files under 'kafka' root dir. Hence, it needs to be made writeable. // *chmod -R a+w kafka* – needed as container is run as 'ducker' and needs write access since kafka root volume from host is mapped to container as "/opt/kafka-dev" where the 'ducker' user writes logs = = *FIXES needed* ~ 1.) Dockerfile - [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile] Change 'UID' to '*UID_DUCKER*'. This won't conflict with built in bash env. var UID and the docker image build should succeed. --- ARG *UID_DUCKER*="1000" RUN useradd -u $*UID_DUCKER* ducker // *{color:#57d9a3}No Error{color}* => No conflict with built-in UID --- 2.) README needs an update where we must ensure the kafka root dir from where the tests are launched is writeable to allow the 'ducker' user to create results/logs. # chmod -R a+w kafka With this, I was able to get the docker images built and system tests started successfully. = Also, I wonder whether or not upstream Dockerfile & System tests are part of CI/CD and get tested for every PR. If so, this issue should have been caught. *Question to kafka SME* - Do you believe this is a valid problem with the Dockerfile and the fix is acceptable? Please let me know and I am happy to submit a PR with this fix. Thanks, Abhijit was: Hello, I tried apache/kafka system tests as per documentation: - (_https://github.com/apache/kafka/tree/trunk/tests#readme_) = PROBLEM ~~ 1.) As root user, clone kafka github repo and start "kafka system tests" # git clone https://github.com/apache/kafka.git # cd kafka # ./gradlew clean systemTestLibs # bash tests/docker/run_tests.sh 2.) Dockerfile issue - _https://github.com/apache/kafka/blob/trunk/tests/docker/*Dockerfile*_ This file has an *UID* entry as shown below: - --- ARG *UID*="1000" RUN useradd -u $*UID* ducker // {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not unique, root user id is 0 --- I ran everything as root which means the built-in bash environment variable 'UID' always resolves to 0 and can't be changed. Hence, the docker build fails. 3.) Next, as root, as per README, I ran: - server:/kafka> *bash tests/docker/run_tests.sh* The ducker tool builds the container images & switches to user '*ducker*' inside the container & maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container. Ref: *https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak* Ex: docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* This fails as the 'ducker' user has *no write permissions* to create files under 'kafka' root dir. Hence, it needs to be made writeable. // *chmod -R a+w kafka* -- needed as container is run as 'ducker' and needs write access since kafka root volume from host is mapped to container as "/opt/kafka-dev" where the 'ducker' user writes logs = = *FIXES needed* ~ 1.) Dockerfile -
[jira] [Updated] (KAFKA-12847) Dockerfile needed for kafka system tests needs changes
[ https://issues.apache.org/jira/browse/KAFKA-12847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Abhijit Mane updated KAFKA-12847: - Description: Hello, I tried apache/kafka system tests as per documentation: - ([https://github.com/apache/kafka/tree/trunk/tests#readme|https://github.com/apache/kafka/tree/trunk/tests#readme_]) = PROBLEM ~~ 1.) As root user, clone kafka github repo and start "kafka system tests" # git clone [https://github.com/apache/kafka.git] # cd kafka # ./gradlew clean systemTestLibs # bash tests/docker/run_tests.sh 2.) Dockerfile issue - [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile|https://github.com/apache/kafka/blob/trunk/tests/docker/*Dockerfile*_] This file has an *UID* entry as shown below: - --- ARG *UID*="1000" RUN useradd -u $*UID* ducker // {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not unique, root user id is 0 --- I ran everything as root which means the built-in bash environment variable 'UID' always resolves to 0 and can't be changed. Hence, the docker build fails. 3.) Next, as root, as per README, I ran: - server:/kafka> *bash tests/docker/run_tests.sh* The ducker tool builds the container images & switches to user '*ducker*' inside the container & maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container. Ref: [https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak|https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak*] Ex: docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* This fails as the 'ducker' user has *no write permissions* to create files under 'kafka' root dir. Hence, it needs to be made writeable. // *chmod -R a+w kafka* – needed as container is run as 'ducker' and needs write access since kafka root volume from host is mapped to container as "/opt/kafka-dev" where the 'ducker' user writes logs = = *FIXES needed* ~ 1.) Dockerfile - [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile] Change 'UID' to '*UID_DUCKER*'. This won't conflict with built in bash env. var UID and the docker image build should succeed. --- ARG *UID_DUCKER*="1000" RUN useradd -u $*UID_DUCKER* ducker // *{color:#57d9a3}No Error{color}* => No conflict with built-in UID --- 2.) README needs an update where we must ensure the kafka root dir from where the tests are launched is writeable to allow the 'ducker' user to create results/logs. # chmod -R a+w kafka With this, I was able to get the docker images built and system tests started successfully. = Also, I wonder whether or not upstream Dockerfile & System tests are part of CI/CD and get tested for every PR. If so, this issue should have been caught. *Question to kafka SME* - Do you believe this is a valid problem with the Dockerfile and the fix is acceptable? Please let me know and I am happy to submit a PR with this fix. Thanks, Abhijit was: Hello, I tried apache/kafka system tests as per documentation: - (_[https://github.com/apache/kafka/tree/trunk/tests#readme_]) = PROBLEM ~~ 1.) As root user, clone kafka github repo and start "kafka system tests" # git clone [https://github.com/apache/kafka.git] # cd kafka # ./gradlew clean systemTestLibs # bash tests/docker/run_tests.sh 2.) Dockerfile issue - [https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile|https://github.com/apache/kafka/blob/trunk/tests/docker/*Dockerfile*_] This file has an *UID* entry as shown below: - --- ARG *UID*="1000" RUN useradd -u $*UID* ducker // {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not unique, root user id is 0 --- I ran everything as root which means the built-in bash environment variable 'UID' always resolves to 0 and can't be changed. Hence, the docker build fails. 3.) Next, as root, as per README, I ran: - server:/kafka> *bash tests/docker/run_tests.sh* The ducker tool builds the container images & switches to user '*ducker*' inside the container & maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container. Ref: *[https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak*] Ex: docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* This fails as the 'ducker' user has *no write permissions* to create files under 'kafka' root dir. Hence, it needs to be made writeable. // *chmod -R a+w kafka* – needed as container is run as 'ducker' and needs write access since kafka root volume from host is mapped to container as "/opt/kafka-dev" where the 'ducker'
[jira] [Updated] (KAFKA-10846) FileStreamSourceTask buffer can grow without bound
[ https://issues.apache.org/jira/browse/KAFKA-10846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Tom Bentley updated KAFKA-10846: Fix Version/s: 2.7.2 > FileStreamSourceTask buffer can grow without bound > -- > > Key: KAFKA-10846 > URL: https://issues.apache.org/jira/browse/KAFKA-10846 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Reporter: Tom Bentley >Assignee: Tom Bentley >Priority: Major > Fix For: 2.8.0, 2.7.2 > > > When reading a large file the buffer used by {{FileStreamSourceTask}} can > grow without bound. Even in the unit test > org.apache.kafka.connect.file.FileStreamSourceTaskTest#testBatchSize the > buffer grows from 1,024 to 524,288 bytes just reading 10,000 copies of a line > of <100 chars. > The problem is that the condition for growing the buffer is incorrect. The > buffer is doubled whenever some bytes were read and the used space in the > buffer == the buffer length. > The requirement to increase the buffer size should be related to whether > {{extractLine()}} actually managed to read any lines. It's only when no > complete lines were read since the last call to {{read()}} that we need to > increase the buffer size (to cope with the large line). -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] tombentley commented on pull request #10750: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka
tombentley commented on pull request #10750: URL: https://github.com/apache/kafka/pull/10750#issuecomment-847768625 It's unlikely that 2.5 or 2.6 will see another release, but I've backported it to the 2.7 branch so it's in any 2.7.2 which gets released. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Created] (KAFKA-12847) Dockerfile needed for kafka system tests needs changes
Abhijit Mane created KAFKA-12847: Summary: Dockerfile needed for kafka system tests needs changes Key: KAFKA-12847 URL: https://issues.apache.org/jira/browse/KAFKA-12847 Project: Kafka Issue Type: Bug Components: system tests Affects Versions: 2.7.1, 2.8.0 Environment: Issue tested in environments below but is independent of h/w arch. or Linux flavor: - 1.) RHEL-8.3 on x86_64 2.) RHEL-8.3 on IBM Power (ppc64le) 3.) apache/kafka branch tested: trunk (master) Reporter: Abhijit Mane Attachments: Dockerfile.upstream Hello, I tried apache/kafka system tests as per documentation: - (_https://github.com/apache/kafka/tree/trunk/tests#readme_) = PROBLEM ~~ 1.) As root user, clone kafka github repo and start "kafka system tests" # git clone https://github.com/apache/kafka.git # cd kafka # ./gradlew clean systemTestLibs # bash tests/docker/run_tests.sh 2.) Dockerfile issue - _https://github.com/apache/kafka/blob/trunk/tests/docker/*Dockerfile*_ This file has an *UID* entry as shown below: - --- ARG *UID*="1000" RUN useradd -u $*UID* ducker // {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not unique, root user id is 0 --- I ran everything as root which means the built-in bash environment variable 'UID' always resolves to 0 and can't be changed. Hence, the docker build fails. 3.) Next, as root, as per README, I ran: - server:/kafka> *bash tests/docker/run_tests.sh* The ducker tool builds the container images & switches to user '*ducker*' inside the container & maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container. Ref: *https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak* Ex: docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* This fails as the 'ducker' user has *no write permissions* to create files under 'kafka' root dir. Hence, it needs to be made writeable. // *chmod -R a+w kafka* -- needed as container is run as 'ducker' and needs write access since kafka root volume from host is mapped to container as "/opt/kafka-dev" where the 'ducker' user writes logs = = *FIXES needed* ~ 1.) Dockerfile - https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile Change 'UID' to '*UID_DUCKER*'. This won't conflict with built in bash env. var UID and the docker image build should succeed. --- ARG *UID_DUCKER*="1000" RUN useradd -u $*UID_DUCKER* ducker // *{color:#57d9a3}No Error{color}* => No conflict with built-in UID --- 2.) README needs an update where we must ensure the kafka root dir from where the tests are launched is writeable to allow the 'ducker' user to create results/logs. # chmod -R a+w kafka With this, I was able to get the docker images built and system tests started successfully. = Also, I wonder whether or not upstream Dockerfile & System tests are part of CI/CD and get tested for every PR. If so, this issue should have been caught. *Question to kafka SME* - Do you believe this is a valid problem with the Dockerfile and the fix is acceptable? Please let me know and I am happy to submit a PR with this fix. Thanks, Abhijit -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] rajinisivaram commented on pull request #10377: KAFKA-12515 ApiVersionManager should create response based on request version
rajinisivaram commented on pull request #10377: URL: https://github.com/apache/kafka/pull/10377#issuecomment-847762942 @feyman2016 We should add `ignorable=true` for the fields added for feature support in `clients/src/main/resources/common/message/ApiVersionsResponse.json`. This would avoid having to pass the version around and set feature fields only if it is supported for the request/response version. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Comment Edited] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test
[ https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350960#comment-17350960 ] Bruno Cadonna edited comment on KAFKA-5676 at 5/25/21, 10:42 AM: - [~marcolotz] Thank you for looking into this! The classes that use {{StreamsMetricsImpl}} should be only internal classes, like {{ClientMetrics}} that contains {{addStateMetrics()}}. They are allowed to use the implementation instead of the interface. The interface {{StreamsMetrics}} should be the one that is exposed in the public API. Currently, I do not recall why we need to move {{MockStreamsMetrics}} in the public API. IMO, we should get rid of {{MockStreamsMetrics}} and replace its usages with an EasyMock mock. However, I do not know whether this is straight forward. See also KAFKA-8977. was (Author: cadonna): [~marcolotz] Thank you for looking into this! The classes that use {{StreamsMetricsImpl}} should be only internal classes, like {{ClientMetrics}} that contains {{addStateMetrics()}}. They are allowed to use the implementation instead of the interface. The interface {{StreamsMetrics}} should be the one that is exposed in the public API. Currently, I do not recall why we need to move {{MockStreamsMetrics}} in the public API. IMO, we should get rid of {{MockStreamsMetrics}} and replace its usages with an EasyMock mock. However, I do not know whether this is straight forward. > MockStreamsMetrics should be in o.a.k.test > -- > > Key: KAFKA-5676 > URL: https://issues.apache.org/jira/browse/KAFKA-5676 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Marco Lotz >Priority: Major > Labels: newbie > Time Spent: 96h > Remaining Estimate: 0h > > {{MockStreamsMetrics}}'s package should be `o.a.k.test` not > `o.a.k.streams.processor.internals`. > In addition, it should not require a {{Metrics}} parameter in its constructor > as it is only needed for its extended base class; the right way of mocking > should be implementing {{StreamsMetrics}} with mock behavior than extended a > real implementaion of {{StreamsMetricsImpl}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test
[ https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350960#comment-17350960 ] Bruno Cadonna commented on KAFKA-5676: -- [~marcolotz] Thank you for looking into this! The classes that use {{StreamsMetricsImpl}} should be only internal classes, like {{ClientMetrics}} that contains {{addStateMetrics()}}. They are allowed to use the implementation instead of the interface. The interface {{StreamsMetrics}} should be the one that is exposed in the public API. Currently, I do not recall why we need to move {{MockStreamsMetrics}} in the public API. IMO, we should get rid of {{MockStreamsMetrics}} and replace its usages with an EasyMock mock. However, I do not know whether this is straight forward. > MockStreamsMetrics should be in o.a.k.test > -- > > Key: KAFKA-5676 > URL: https://issues.apache.org/jira/browse/KAFKA-5676 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Guozhang Wang >Assignee: Marco Lotz >Priority: Major > Labels: newbie > Time Spent: 96h > Remaining Estimate: 0h > > {{MockStreamsMetrics}}'s package should be `o.a.k.test` not > `o.a.k.streams.processor.internals`. > In addition, it should not require a {{Metrics}} parameter in its constructor > as it is only needed for its extended base class; the right way of mocking > should be implementing {{StreamsMetrics}} with mock behavior than extended a > real implementaion of {{StreamsMetricsImpl}}. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] DuongPTIT commented on pull request #10750: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka
DuongPTIT commented on pull request #10750: URL: https://github.com/apache/kafka/pull/10750#issuecomment-847756554 > Probably fixed by #9735 I've seen that this issue still happen in V2.5, V2.6, V2.7. What about these versions? Do they need to fix this similar to V2.8 and trunk? Thanks. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] dengziming commented on pull request #10701: KAFKA-10437; Fix omitted TODO of KIP-478
dengziming commented on pull request #10701: URL: https://github.com/apache/kafka/pull/10701#issuecomment-847753120 @mjsax @guozhangwang , how do you think about this approach? In fact, I'm not very confident about this change since it isn't very elegant, but this can remove the TODO, feel free to point my fault. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r638655936 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -637,19 +709,27 @@ private void assignPartition(TopicPartition partition, } } -private boolean canParticipateInReassignment(TopicPartition partition, - Map> partition2AllPotentialConsumers) { +private boolean canParticipateInReassignment(String topic, + Map> topic2AllPotentialConsumers) { // if a partition has two or more potential consumers it is subject to reassignment. Review comment: Good catch! Updated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] tombentley commented on pull request #10750: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka
tombentley commented on pull request #10750: URL: https://github.com/apache/kafka/pull/10750#issuecomment-847746044 Probably fixed by https://github.com/apache/kafka/pull/9735 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r638654728 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -469,73 +426,190 @@ private boolean allSubscriptionsEqual(Set allTopics, TreeSet sortedCurrentSubscriptions = new TreeSet<>(new SubscriptionComparator(currentAssignment)); sortedCurrentSubscriptions.addAll(currentAssignment.keySet()); -balance(currentAssignment, prevAssignment, sortedPartitions, unassignedPartitions, sortedCurrentSubscriptions, -consumer2AllPotentialPartitions, partition2AllPotentialConsumers, currentPartitionConsumer, revocationRequired); +balance(currentAssignment, prevAssignment, sortedAllPartitions, unassignedPartitions, sortedCurrentSubscriptions, +consumer2AllPotentialTopics, topic2AllPotentialConsumers, currentPartitionConsumer, revocationRequired, +partitionsPerTopic, totalPartitionsCount); + +if (log.isDebugEnabled()) { +log.debug("final assignment: {}", currentAssignment); +} + return currentAssignment; } +/** + * get the unassigned partition list by computing the difference set of the sortedPartitions(all partitions) + * and sortedAssignedPartitions. If no assigned partitions, we'll just return all sorted topic partitions. + * This is used in generalAssign method + * + * We loop the sortedPartition, and compare the ith element in sortedAssignedPartitions(i start from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedAssignedPartitions + * + * @param sortedAllPartitions: sorted all partitions + * @param sortedAssignedPartitions: sorted partitions, all are included in the sortedPartitions + * @param topic2AllPotentialConsumers: topics mapped to all consumers that subscribed to it + * @return partitions that aren't assigned to any current consumer + */ +private List getUnassignedPartitions(List sortedAllPartitions, + List sortedAssignedPartitions, + Map> topic2AllPotentialConsumers) { +if (sortedAssignedPartitions.isEmpty()) { +return sortedAllPartitions; +} + +List unassignedPartitions = new ArrayList<>(); + +Collections.sort(sortedAssignedPartitions, new PartitionComparator(topic2AllPotentialConsumers)); + +boolean shouldAddDirectly = false; +Iterator sortedAssignedPartitionsIter = sortedAssignedPartitions.iterator(); +TopicPartition nextAssignedPartition = sortedAssignedPartitionsIter.next(); + +for (TopicPartition topicPartition : sortedAllPartitions) { +if (shouldAddDirectly || !nextAssignedPartition.equals(topicPartition)) { +unassignedPartitions.add(topicPartition); +} else { +// this partition is in assignedPartitions, don't add to unassignedPartitions, just get next assigned partition +if (sortedAssignedPartitionsIter.hasNext()) { +nextAssignedPartition = sortedAssignedPartitionsIter.next(); +} else { +// add the remaining directly since there is no more sortedAssignedPartitions +shouldAddDirectly = true; +} +} +} +return unassignedPartitions; +} + +/** + * get the unassigned partition list by computing the difference set of all sorted partitions + * and sortedAssignedPartitions. If no assigned partitions, we'll just return all sorted topic partitions. + * This is used in constrainedAssign method + * + * To compute the difference set, we use two pointers technique here: + * + * We loop through the all sorted topics, and then iterate all partitions the topic has, + * compared with the ith element in sortedAssignedPartitions(i starts from 0): + * - if not equal to the ith element, add to unassignedPartitions + * - if equal to the the ith element, get next element from sortedAssignedPartitions + * + * @param totalPartitionsCount all partitions counts in this assignment + * @param partitionsPerTopicthe number of partitions for each subscribed topic. + * @param sortedAssignedPartitions sorted partitions, all are included in the sortedPartitions + * @return the partitions not yet assigned to any consumers + */ +private List getUnassignedPartitions(int totalPartitionsCount, + Map partitionsPerTopic, +
[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance
showuon commented on a change in pull request #10552: URL: https://github.com/apache/kafka/pull/10552#discussion_r638651011 ## File path: clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java ## @@ -444,23 +392,32 @@ private boolean allSubscriptionsEqual(Set allTopics, // otherwise (the consumer still exists) for (Iterator partitionIter = entry.getValue().iterator(); partitionIter.hasNext();) { TopicPartition partition = partitionIter.next(); -if (!partition2AllPotentialConsumers.containsKey(partition)) { -// if this topic partition of this consumer no longer exists remove it from currentAssignment of the consumer +if (!topic2AllPotentialConsumers.containsKey(partition.topic())) { +// if this topic partition of this consumer no longer exists, remove it from currentAssignment of the consumer partitionIter.remove(); currentPartitionConsumer.remove(partition); -} else if (!subscriptions.get(entry.getKey()).topics().contains(partition.topic())) { -// if this partition cannot remain assigned to its current consumer because the consumer -// is no longer subscribed to its topic remove it from currentAssignment of the consumer +} else if (!consumerSubscription.topics().contains(partition.topic())) { +// because the consumer is no longer subscribed to its topic, remove it from currentAssignment of the consumer partitionIter.remove(); revocationRequired = true; -} else +} else { // otherwise, remove the topic partition from those that need to be assigned only if // its current consumer is still subscribed to its topic (because it is already assigned // and we would want to preserve that assignment as much as possible) -unassignedPartitions.remove(partition); +assignedPartitions.add(partition); +} } } } + +// all partitions that needed to be assigned +List unassignedPartitions = getUnassignedPartitions(sortedAllPartitions, assignedPartitions, topic2AllPotentialConsumers); +assignedPartitions = null; Review comment: Yes, `assignedPartitions.clear()` would have the same impact, but it'll loop through all the arrayList and nullify them one by one. I think we can either `null` it, or remove this line. What do you think? ```java /** * Removes all of the elements from this list. The list will * be empty after this call returns. */ public void clear() { modCount++; final Object[] es = elementData; for (int to = size, i = size = 0; i < to; i++) es[i] = null; }``` ``` -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] DuongPTIT edited a comment on pull request #10750: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka
DuongPTIT edited a comment on pull request #10750: URL: https://github.com/apache/kafka/pull/10750#issuecomment-847728426 > Please confirm if the issue only happen in V2.5 or newer release. If also happen in trunk, please make the merge target as `trunk` (and fix based on the `trunk` branch), and if only in V2.5, please raise question in dev email group, to ask if there's any new release plan for V2.5. Thank you. I've just reproduced this. I found out that the issue happened in V2.7 and older release, but it worked fine in V2.8 and trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12824) Remove Deprecated method KStream#branch
[ https://issues.apache.org/jira/browse/KAFKA-12824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350942#comment-17350942 ] Josep Prat commented on KAFKA-12824: On the parent issue there is this sentence: > Each subtask will de focusing on a specific API, so it's easy to discuss if > it should be removed by 4.0.0 or maybe even at a later point. And now I added an extra comment on the parent issue as well, stating, that when the time for 4.0.0 comes, we might need to re-evaluate if it has passed enough time or not yet for each API. > Remove Deprecated method KStream#branch > --- > > Key: KAFKA-12824 > URL: https://issues.apache.org/jira/browse/KAFKA-12824 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Josep Prat >Priority: Blocker > Fix For: 4.0.0 > > > The method branch in both Java and Scala KStream class was deprecated in > version 2.8: > * org.apache.kafka.streams.scala.kstream.KStream#branch > * > org.apache.kafka.streams.kstream.KStream#branch(org.apache.kafka.streams.kstream.Predicate super K,? super V>...) > * > org.apache.kafka.streams.kstream.KStream#branch(org.apache.kafka.streams.kstream.Named, > org.apache.kafka.streams.kstream.Predicate...) > > See KAFKA-5488 and KIP-418 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Commented] (KAFKA-12824) Remove Deprecated method KStream#branch
[ https://issues.apache.org/jira/browse/KAFKA-12824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350941#comment-17350941 ] Bruno Cadonna commented on KAFKA-12824: --- [~mjsax] Do we know for which date AK 4.0.0 is planned? I could not find anything on the wiki. Since we do not know the release date, we do not know if the deprecation period will be enough or not. I would add a note to the description of this ticket that says that it is not clear whether the deprecation period of the subtasks is long enough for 4.0.0 and that they should be re-evaluated once it is clear when 4.0.0 will be released. > Remove Deprecated method KStream#branch > --- > > Key: KAFKA-12824 > URL: https://issues.apache.org/jira/browse/KAFKA-12824 > Project: Kafka > Issue Type: Sub-task > Components: streams >Reporter: Josep Prat >Priority: Blocker > Fix For: 4.0.0 > > > The method branch in both Java and Scala KStream class was deprecated in > version 2.8: > * org.apache.kafka.streams.scala.kstream.KStream#branch > * > org.apache.kafka.streams.kstream.KStream#branch(org.apache.kafka.streams.kstream.Predicate super K,? super V>...) > * > org.apache.kafka.streams.kstream.KStream#branch(org.apache.kafka.streams.kstream.Named, > org.apache.kafka.streams.kstream.Predicate...) > > See KAFKA-5488 and KIP-418 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] DuongPTIT commented on pull request #10670: KAFKA-10273 Connect Converters should produce actionable error messages
DuongPTIT commented on pull request #10670: URL: https://github.com/apache/kafka/pull/10670#issuecomment-847730974 hi @showuon, please take a review. Thank you so much! -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] DuongPTIT removed a comment on pull request #10670: KAFKA-10273 Connect Converters should produce actionable error messages
DuongPTIT removed a comment on pull request #10670: URL: https://github.com/apache/kafka/pull/10670#issuecomment-840464740 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] DuongPTIT commented on pull request #10750: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka
DuongPTIT commented on pull request #10750: URL: https://github.com/apache/kafka/pull/10750#issuecomment-847728426 > Please confirm if the issue only happen in V2.5 or newer release. If also happen in trunk, please make the merge target as `trunk` (and fix based on the `trunk` branch), and if only in V2.5, please raise question in dev email group, to ask if there's any new release plan for V2.5. Thank you. I've just reproduced this. I found out that the issue happened in V2.7 and older version, but it worked fine in V2.8 and trunk. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Assigned] (KAFKA-12461) Extend LogManager to cover the metadata topic
[ https://issues.apache.org/jira/browse/KAFKA-12461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loboxu reassigned KAFKA-12461: -- Assignee: (was: loboxu) > Extend LogManager to cover the metadata topic > - > > Key: KAFKA-12461 > URL: https://issues.apache.org/jira/browse/KAFKA-12461 > Project: Kafka > Issue Type: Sub-task >Reporter: Jason Gustafson >Priority: Major > > The `@metadata` topic is not managed by `LogManager` since it uses a new > snapshot-based retention policy. This means that it is not covered by the > recovery and high watermark checkpoints. It would be useful to fix this. We > can either extend `LogManager` so that it is aware of the snapshotting > semantics implemented by the `@metadata` topic, or we can create something > like a `RaftLogManager`. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Assigned] (KAFKA-10900) Add metrics enumerated in KIP-630
[ https://issues.apache.org/jira/browse/KAFKA-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] loboxu reassigned KAFKA-10900: -- Assignee: loboxu > Add metrics enumerated in KIP-630 > - > > Key: KAFKA-10900 > URL: https://issues.apache.org/jira/browse/KAFKA-10900 > Project: Kafka > Issue Type: Sub-task > Components: replication >Reporter: Jose Armando Garcia Sancio >Assignee: loboxu >Priority: Major > > KIP-630 enumerates a few metrics. Makes sure that those metrics are > implemented. -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] kowshik closed pull request #9764: MINOR: Eliminate KafkaScheduler#scheduleOnce in favor of KafkaScheduler#schedule
kowshik closed pull request #9764: URL: https://github.com/apache/kafka/pull/9764 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[jira] [Commented] (KAFKA-12846) why need this logic in Consumer‘s Fetch logic it should remove?
[ https://issues.apache.org/jira/browse/KAFKA-12846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350904#comment-17350904 ] Luke Chen commented on KAFKA-12846: --- [~ws], I believe the comments in else block answers your question. I'm not sure what your expected result is. Are you trying to get rid of the else block? If so, maybe you have to prove the comment is wrong, so that we can get rid of it. What do you think? > why need this logic in Consumer‘s Fetch logic it should remove? > - > > Key: KAFKA-12846 > URL: https://issues.apache.org/jira/browse/KAFKA-12846 > Project: Kafka > Issue Type: Wish >Reporter: yws >Priority: Trivial > Fix For: 2.3.0 > > > package: org.apache.kafka.clients.consumer.internals > class: Fetcher > else { > // this case shouldn't usually happen because we > only send one fetch at a time per partition, > // but it might conceivably happen in some rare > cases (such as partition leader changes). > // we have to copy to a new list because the old > one may be immutable > List> newRecords = new > ArrayList<>(records.size() + currentRecords.size()); > newRecords.addAll(currentRecords); > newRecords.addAll(records); > fetched.put(partition, newRecords); > } > recordsRemaining -= records.size(); > } > I just cannot think of the case that it will goes to the else logic, who can > illustrate it? it's useless logic in my opinion, looking forward to reply! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-12846) why need this logic in Consumer‘s Fetch logic it should remove?
yws created KAFKA-12846: --- Summary: why need this logic in Consumer‘s Fetch logic it should remove? Key: KAFKA-12846 URL: https://issues.apache.org/jira/browse/KAFKA-12846 Project: Kafka Issue Type: Wish Reporter: yws Fix For: 2.3.0 package: org.apache.kafka.clients.consumer.internals class: Fetcher else { // this case shouldn't usually happen because we only send one fetch at a time per partition, // but it might conceivably happen in some rare cases (such as partition leader changes). // we have to copy to a new list because the old one may be immutable List> newRecords = new ArrayList<>(records.size() + currentRecords.size()); newRecords.addAll(currentRecords); newRecords.addAll(records); fetched.put(partition, newRecords); } recordsRemaining -= records.size(); } I just cannot think of the case that it will goes to the else logic, who can illustrate it? it's useless logic in my opinion, looking forward to reply! -- This message was sent by Atlassian Jira (v8.3.4#803005)
[GitHub] [kafka] showuon commented on pull request #10750: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka
showuon commented on pull request #10750: URL: https://github.com/apache/kafka/pull/10750#issuecomment-847654763 @DuongPTIT , thanks for the PR. One important question to you before reviewing the code: could you confirm this issue only happen in V2.5, and not V2.8? I'm not sure if we will have another minor release for V2.5 ( I don't think it's highly possible because it's 4 releases ago). That means, even your fix merge into V2.5, you might not be able to upgrade Kafka to get this fix. Usually we merge to trunk first, and then back port to older release. So, if you can confirm this issue will happen in V2.8 or `trunk` branch, it will get fixed in next release definitely! Thank you. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] socutes commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException
socutes commented on a change in pull request #10749: URL: https://github.com/apache/kafka/pull/10749#discussion_r638556812 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -372,27 +371,23 @@ private void maybeFireLeaderChange() { @Override public void initialize() { -try { -quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch())); +quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch())); -long currentTimeMs = time.milliseconds(); -if (quorum.isLeader()) { -throw new IllegalStateException("Voter cannot initialize as a Leader"); -} else if (quorum.isCandidate()) { -onBecomeCandidate(currentTimeMs); -} else if (quorum.isFollower()) { -onBecomeFollower(currentTimeMs); -} +long currentTimeMs = time.milliseconds(); +if (quorum.isLeader()) { +throw new IllegalStateException("Voter cannot initialize as a Leader"); +} else if (quorum.isCandidate()) { +onBecomeCandidate(currentTimeMs); +} else if (quorum.isFollower()) { +onBecomeFollower(currentTimeMs); +} -// When there is only a single voter, become candidate immediately -if (quorum.isVoter() +// When there is only a single voter, become candidate immediately +if (quorum.isVoter() && quorum.remoteVoters().isEmpty() && !quorum.isCandidate()) { Review comment: Thank you very much for your precious time. I understand it and I will fix it ## File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java ## @@ -116,7 +117,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IOException, if (election == null) { election = ElectionState.withUnknownLeader(0, voters); } -} catch (final IOException e) { +} catch (final Exception e) { Review comment: Yes, I seem to have -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] DuongPTIT commented on pull request #10750: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka
DuongPTIT commented on pull request #10750: URL: https://github.com/apache/kafka/pull/10750#issuecomment-847644863 hi @showuon, can you please review this for me? Many thanks -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org
[GitHub] [kafka] showuon commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException
showuon commented on a change in pull request #10749: URL: https://github.com/apache/kafka/pull/10749#discussion_r638544356 ## File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java ## @@ -372,27 +371,23 @@ private void maybeFireLeaderChange() { @Override public void initialize() { -try { -quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch())); +quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, log.lastFetchedEpoch())); -long currentTimeMs = time.milliseconds(); -if (quorum.isLeader()) { -throw new IllegalStateException("Voter cannot initialize as a Leader"); -} else if (quorum.isCandidate()) { -onBecomeCandidate(currentTimeMs); -} else if (quorum.isFollower()) { -onBecomeFollower(currentTimeMs); -} +long currentTimeMs = time.milliseconds(); +if (quorum.isLeader()) { +throw new IllegalStateException("Voter cannot initialize as a Leader"); +} else if (quorum.isCandidate()) { +onBecomeCandidate(currentTimeMs); +} else if (quorum.isFollower()) { +onBecomeFollower(currentTimeMs); +} -// When there is only a single voter, become candidate immediately -if (quorum.isVoter() +// When there is only a single voter, become candidate immediately +if (quorum.isVoter() && quorum.remoteVoters().isEmpty() && !quorum.isCandidate()) { Review comment: No, you can see the change, you move this line `if (quorum.isVoter()` left (4 spaces I guess), so the following 2 lines should also move left. ## File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java ## @@ -116,7 +117,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) throws IOException, if (election == null) { election = ElectionState.withUnknownLeader(0, voters); } -} catch (final IOException e) { +} catch (final Exception e) { Review comment: Yes, but I think `Exception` is covering too many "unexpected" exceptions. You can see the catch block, we are handling IOException case, not other exceptions. I think we can directly catch `UncheckedIOException` here. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org