[GitHub] [kafka] chia7712 opened a new pull request #10764: MINOR: make sure all fiedls of o.p.k.s.a.Action are NOT null

2021-05-25 Thread GitBox


chia7712 opened a new pull request #10764:
URL: https://github.com/apache/kafka/pull/10764


   I'm migrating Ranger's kafka plugin from deprecated Authorizer (this is 
already removed by 976e78e405d57943b989ac487b7f49119b0f4af4)  to new API (see 
https://issues.apache.org/jira/browse/RANGER-3231). The kafka plugin needs to 
take something from field `resourcePattern` but it does not know whether the 
field is nullable (or users need to add null check). I check all usages and I 
don't observe any null case.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-25 Thread GitBox


showuon commented on pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#issuecomment-848453397


   @vahidhashemian , thank you for your review! :)


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vahidhashemian commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-25 Thread GitBox


vahidhashemian commented on pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#issuecomment-848436761


   Thanks for addressing my comments @showuon. I tested a couple of unit tests 
and saw the difference this change makes.
   I have no further comment at this time. Given this is a big change I'd wait 
for @ableegoldman's review before approval. In the meantime, I may test some 
scenarios for additional validation.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-05-25 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351457#comment-17351457
 ] 

Luke Chen commented on KAFKA-9295:
--

On it!

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10755: MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide

2021-05-25 Thread GitBox


ableegoldman commented on a change in pull request #10755:
URL: https://github.com/apache/kafka/pull/10755#discussion_r639358536



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
##
@@ -40,6 +40,18 @@
 
 private final Optional timeCurrentIdlingStarted;
 
+/**
+ * @deprecated since 3.0, please use {@link #TaskMetadata(TaskId, Set, 
Map, Map, Optional) instead}
+ */
+@Deprecated
+public TaskMetadata(final String taskId,
+final Set topicPartitions,
+final Map committedOffsets,
+final Map endOffsets,
+final Optional timeCurrentIdlingStarted) {
+this(TaskId.parse(taskId), topicPartitions, committedOffsets, 
endOffsets, timeCurrentIdlingStarted);
+}
+
 public TaskMetadata(final TaskId taskId,

Review comment:
   https://issues.apache.org/jira/browse/KAFKA-12849




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10745: MINOR: add window verification to sliding-window co-group test

2021-05-25 Thread GitBox


ableegoldman commented on a change in pull request #10745:
URL: https://github.com/apache/kafka/pull/10745#discussion_r639358048



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/SlidingWindowedCogroupedKStreamImplTest.java
##
@@ -169,34 +167,37 @@ public void slidingWindowAggregateStreamsTest() {
 testInputTopic.pipeInput("k2", "B", 504);
 testInputTopic.pipeInput("k1", "B", 504);
 
-final Set> results = new HashSet<>();
-while (!testOutputTopic.isEmpty()) {
-final TestRecord, String> realRecord = 
testOutputTopic.readRecord();
-final TestRecord nonWindowedRecord = new 
TestRecord<>(
-realRecord.getKey().key(), realRecord.getValue(), null, 
realRecord.timestamp());
-results.add(nonWindowedRecord);
-}
-final Set> expected = new HashSet<>();
-expected.add(new TestRecord<>("k1", "0+A", null, 500L));
-expected.add(new TestRecord<>("k2", "0+A", null, 500L));
-expected.add(new TestRecord<>("k2", "0+A", null, 501L));
-expected.add(new TestRecord<>("k2", "0+A+A", null, 501L));
-expected.add(new TestRecord<>("k1", "0+A", null, 502L));
-expected.add(new TestRecord<>("k1", "0+A+A", null, 502L));
-expected.add(new TestRecord<>("k1", "0+A+B", null, 503L));
-expected.add(new TestRecord<>("k1", "0+B", null, 503L));
-expected.add(new TestRecord<>("k1", "0+A+A+B", null, 503L));
-expected.add(new TestRecord<>("k2", "0+A+B", null, 503L));
-expected.add(new TestRecord<>("k2", "0+B", null, 503L));
-expected.add(new TestRecord<>("k2", "0+A+A+B", null, 503L));
-expected.add(new TestRecord<>("k2", "0+A+B+B", null, 504L));
-expected.add(new TestRecord<>("k2", "0+B+B", null, 504L));
-expected.add(new TestRecord<>("k2", "0+B", null, 504L));
-expected.add(new TestRecord<>("k2", "0+A+A+B+B", null, 504L));
-expected.add(new TestRecord<>("k1", "0+A+B+B", null, 504L));
-expected.add(new TestRecord<>("k1", "0+B+B", null, 504L));
-expected.add(new TestRecord<>("k1", "0+B", null, 504L));
-expected.add(new TestRecord<>("k1", "0+A+A+B+B", null, 504L));
+final List, String>> results = 
testOutputTopic.readRecordsToList();
+
+final List, String>> expected = new 
LinkedList<>();
+// k1-A-500
+expected.add(new TestRecord<>(new Windowed<>("k1", new 
TimeWindow(0L, 500L)), "0+A", null, 500L));

Review comment:
   >A {@link TimeWindow} covers a half-open time interval
   
   I was about to say we should just make `TimeWindow` un-opinionated, but this 
is literally the first thing in the javadocs for the class. So I'd say it's 
pretty clear about what it's representing -- totally missed this before, I 
thought it was just a basic container class 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12848) Add some basic benchmarks for Kafka Streams

2021-05-25 Thread Sagar Rao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12848?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sagar Rao reassigned KAFKA-12848:
-

Assignee: Sagar Rao

> Add some basic benchmarks for Kafka Streams
> ---
>
> Key: KAFKA-12848
> URL: https://issues.apache.org/jira/browse/KAFKA-12848
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Major
>  Labels: newbie, newbie++
>
> As the title suggests, we often want to test out improvements or verify that 
> a bugfix does not introduce a serious regression. While there are existing 
> benchmarks that are run for quality assurance by various contributors, there 
> are no publicly available benchmarks for Kafka Streams in AK itself.
> It would be great if we had a simple jmh suite (or something) with various 
> Streams features which could be run on a one-off basis by developers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2021-05-25 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351444#comment-17351444
 ] 

Sagar Rao commented on KAFKA-9168:
--

Sure thanks. Probably, I will send a patch for one of the APIs (let's say 
put()) and you can run benchmarks using the internal repo? Meanwhile, I have 
assigned the other ticket related to benchmarking framework to myself. I can 
start looking into it as well.

BTW, do you think this particular ticket, if the numbers look fine,  would 
warrant a KIP?

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: perfomance
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10755: MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide

2021-05-25 Thread GitBox


ableegoldman commented on pull request #10755:
URL: https://github.com/apache/kafka/pull/10755#issuecomment-848407312


   Addressed your comments @mjsax 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10755: MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide

2021-05-25 Thread GitBox


ableegoldman commented on a change in pull request #10755:
URL: https://github.com/apache/kafka/pull/10755#discussion_r639356381



##
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
##
@@ -80,6 +83,35 @@ public String toString() {
 return namedTopology != null ? namedTopology + "_" + topicGroupId + 
"_" + partition : topicGroupId + "_" + partition;
 }
 
+/**
+ * @throws TaskIdFormatException if the taskIdStr is not a valid {@link 
TaskId}
+ */
+public static TaskId parse(final String taskIdStr) {

Review comment:
   Well that would be a breaking change by removing a non-deprecated API, 
no? And in this case I actually believe we should _not_ deprecate it -- if 
`toString` is part of the public TaskId API (and it should be) then this 
`parse` method which does the reverse should be as well. As I discussed with 
some during the KIP-740 ~debacle~ debate, part of the public contract of TaskId 
is in its string representation since that is what ends up in logs, metrics, 
etc. So imo it does make sense to provide a String-to-TaskId API




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10755: MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide

2021-05-25 Thread GitBox


ableegoldman commented on a change in pull request #10755:
URL: https://github.com/apache/kafka/pull/10755#discussion_r639355330



##
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
##
@@ -80,6 +83,35 @@ public String toString() {
 return namedTopology != null ? namedTopology + "_" + topicGroupId + 
"_" + partition : topicGroupId + "_" + partition;
 }
 
+/**
+ * @throws TaskIdFormatException if the taskIdStr is not a valid {@link 
TaskId}
+ */
+public static TaskId parse(final String taskIdStr) {
+final int firstIndex = taskIdStr.indexOf('_');

Review comment:
   Yes, we plan to restrict the `_` character. If we want to loosen that up 
later we can just parse this from the back, but I think it's reasonable to just 
disallow `_` completely. 
   
   > What is a topology name?
   
   Great question. Not necessarily a short answer but I can try -- basically an 
independent and isolated piece of a topology that can be added/removed/etc at 
will, even on a running app. 
   
   > How to set it?
   
   The skeleton API was merged in 
[#10615](https://github.com/apache/kafka/pull/10615/files), it has/is evolving 
a bit since then but the basic idea holds -- each NamedTopology is built up 
with a special builder called the NamedTopologyStreamsBuilder. And a dedicated 
KafkaStreams wrapper is the entry point for starting up an app using 
NamedTopologies. All currently under the `internals` package while it's under 
the experimental phase so it should not be possible for a user to end up with 
anything NamedTopology through public APIs.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12849) Consider migrating TaskMetadata to interface with internal implementation

2021-05-25 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12849?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351440#comment-17351440
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12849:


Though we went through a similar discussion with TaskId and ultimately decided 
it needed to be an actual class and not an interface, imo those arguments and 
conditions do not really apply to the TaskMetadata class. So I would propose to 
move this to an interface, though I didn't want to get into it during KIP-740 
which had already dragged on for quite long enough :) 

> Consider migrating TaskMetadata to interface with internal implementation
> -
>
> Key: KAFKA-12849
> URL: https://issues.apache.org/jira/browse/KAFKA-12849
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> In KIP-740 we had to go through a deprecation cycle in order to change the 
> constructor from the original one which accepted the taskId parameter as a 
> string, to the new one which takes a TaskId object directly. We had 
> considered just changing the signature directly without deprecation as this 
> was never intended to be instantiated by users, rather it just acts as a 
> pass-through metadata class. Sort of by definition if there is no reason to 
> ever instantiate it, this seems to indicate it may be better suited as a 
> public interface with the implementation and constructor as internal APIs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12849) Consider migrating TaskMetadata to interface with internal implementation

2021-05-25 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12849:
--

 Summary: Consider migrating TaskMetadata to interface with 
internal implementation
 Key: KAFKA-12849
 URL: https://issues.apache.org/jira/browse/KAFKA-12849
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


In KIP-740 we had to go through a deprecation cycle in order to change the 
constructor from the original one which accepted the taskId parameter as a 
string, to the new one which takes a TaskId object directly. We had considered 
just changing the signature directly without deprecation as this was never 
intended to be instantiated by users, rather it just acts as a pass-through 
metadata class. Sort of by definition if there is no reason to ever instantiate 
it, this seems to indicate it may be better suited as a public interface with 
the implementation and constructor as internal APIs.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on a change in pull request #10755: MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide

2021-05-25 Thread GitBox


ableegoldman commented on a change in pull request #10755:
URL: https://github.com/apache/kafka/pull/10755#discussion_r639351170



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
##
@@ -40,6 +40,18 @@
 
 private final Optional timeCurrentIdlingStarted;
 
+/**
+ * @deprecated since 3.0, please use {@link #TaskMetadata(TaskId, Set, 
Map, Map, Optional) instead}
+ */
+@Deprecated
+public TaskMetadata(final String taskId,
+final Set topicPartitions,
+final Map committedOffsets,
+final Map endOffsets,
+final Optional timeCurrentIdlingStarted) {
+this(TaskId.parse(taskId), topicPartitions, committedOffsets, 
endOffsets, timeCurrentIdlingStarted);
+}
+
 public TaskMetadata(final TaskId taskId,

Review comment:
   Well, we would still need a public constructor no matter what even with 
an internal impl class. I'll add a comment to clarify that it's not intended 
for public use and maybe file a followup ticket to migrate this to an interface 
in case someone wants to get into that




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10755: MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide

2021-05-25 Thread GitBox


ableegoldman commented on a change in pull request #10755:
URL: https://github.com/apache/kafka/pull/10755#discussion_r639350604



##
File path: docs/streams/upgrade-guide.html
##
@@ -117,6 +117,14 @@ Streams API
 
 We removed the default implementation of 
RocksDBConfigSetter#close().
 
+
+
+The public topicGroupId and partition fields 
on TaskId have been deprecated and replaced with getters, please migrate to 
using the new TaskId.subtopology()
+and TaskId.partition() APIs instead. Also, the 
TaskId#readFrom and TaskId#writeTo methods have been 
deprecated and will be removed, as they were never intended
+for public use to begin with. Finally, we have deprecated the 
TaskMetadata.taskId() method as well as the 
TaskMetadataconstructor. These have been replaced with APIs that
+better represent the task id as an actual TaskId object 
instead of a String. Please migrate to the new 
TaskMetadata#getTaskId method and the new constructor which accepts

Review comment:
   True. I'll remove this note about it




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10755: MINOR: deprecate TaskMetadata constructor and add KIP-740 notes to upgrade guide

2021-05-25 Thread GitBox


mjsax commented on a change in pull request #10755:
URL: https://github.com/apache/kafka/pull/10755#discussion_r639338584



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/TaskMetadata.java
##
@@ -40,6 +40,18 @@
 
 private final Optional timeCurrentIdlingStarted;
 
+/**
+ * @deprecated since 3.0, please use {@link #TaskMetadata(TaskId, Set, 
Map, Map, Optional) instead}
+ */
+@Deprecated
+public TaskMetadata(final String taskId,
+final Set topicPartitions,
+final Map committedOffsets,
+final Map endOffsets,
+final Optional timeCurrentIdlingStarted) {
+this(TaskId.parse(taskId), topicPartitions, committedOffsets, 
endOffsets, timeCurrentIdlingStarted);
+}
+
 public TaskMetadata(final TaskId taskId,

Review comment:
   Should we make this protected and add a internal "impl" class? (And also 
mark as non-public so we can eventually move to an interface?)

##
File path: docs/streams/upgrade-guide.html
##
@@ -117,6 +117,14 @@ Streams API
 
 We removed the default implementation of 
RocksDBConfigSetter#close().
 
+
+
+The public topicGroupId and partition fields 
on TaskId have been deprecated and replaced with getters, please migrate to 
using the new TaskId.subtopology()
+and TaskId.partition() APIs instead. Also, the 
TaskId#readFrom and TaskId#writeTo methods have been 
deprecated and will be removed, as they were never intended
+for public use to begin with. Finally, we have deprecated the 
TaskMetadata.taskId() method as well as the 
TaskMetadataconstructor. These have been replaced with APIs that

Review comment:
   nit: remove `to begin with` ?

##
File path: docs/streams/upgrade-guide.html
##
@@ -117,6 +117,14 @@ Streams API
 
 We removed the default implementation of 
RocksDBConfigSetter#close().
 
+
+
+The public topicGroupId and partition fields 
on TaskId have been deprecated and replaced with getters, please migrate to 
using the new TaskId.subtopology()
+and TaskId.partition() APIs instead. Also, the 
TaskId#readFrom and TaskId#writeTo methods have been 
deprecated and will be removed, as they were never intended
+for public use to begin with. Finally, we have deprecated the 
TaskMetadata.taskId() method as well as the 
TaskMetadataconstructor. These have been replaced with APIs that
+better represent the task id as an actual TaskId object 
instead of a String. Please migrate to the new 
TaskMetadata#getTaskId method and the new constructor which accepts

Review comment:
   `and the new constructor`
   
   I thought that the constructor was never intended for public usage. Should 
we not hide the new constructor?

##
File path: docs/streams/upgrade-guide.html
##
@@ -117,6 +117,14 @@ Streams API
 
 We removed the default implementation of 
RocksDBConfigSetter#close().
 
+
+
+The public topicGroupId and partition fields 
on TaskId have been deprecated and replaced with getters, please migrate to 
using the new TaskId.subtopology()
+and TaskId.partition() APIs instead. Also, the 
TaskId#readFrom and TaskId#writeTo methods have been 
deprecated and will be removed, as they were never intended
+for public use to begin with. Finally, we have deprecated the 
TaskMetadata.taskId() method as well as the 
TaskMetadataconstructor. These have been replaced with APIs that

Review comment:
   missing blank `TaskMetadataconstructor` 

##
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
##
@@ -80,6 +83,35 @@ public String toString() {
 return namedTopology != null ? namedTopology + "_" + topicGroupId + 
"_" + partition : topicGroupId + "_" + partition;
 }
 
+/**
+ * @throws TaskIdFormatException if the taskIdStr is not a valid {@link 
TaskId}
+ */
+public static TaskId parse(final String taskIdStr) {

Review comment:
   If it was removed (and release in 2.8), why add it back? Seems we don't 
need it?

##
File path: streams/src/main/java/org/apache/kafka/streams/processor/TaskId.java
##
@@ -80,6 +83,35 @@ public String toString() {
 return namedTopology != null ? namedTopology + "_" + topicGroupId + 
"_" + partition : topicGroupId + "_" + partition;
 }
 
+/**
+ * @throws TaskIdFormatException if the taskIdStr is not a valid {@link 
TaskId}
+ */
+public static TaskId parse(final String taskIdStr) {
+final int firstIndex = taskIdStr.indexOf('_');

Review comment:
   I guess I missed the "name topology" change. What is a topology name? 
How to set it? And do we ensure that we don't allow `_` in its name?

##
File path: docs/streams/upgrade-guide.html
##
@@ -117,6 +117,14 @@ Streams API
 
 

[GitHub] [kafka] DuongPTIT commented on pull request #10677: KAFKA-12380 Executor in Connect's Worker is not shut down when the worker is

2021-05-25 Thread GitBox


DuongPTIT commented on pull request #10677:
URL: https://github.com/apache/kafka/pull/10677#issuecomment-848382648


   hi @showuon, can you please review this issue for me? Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] DuongPTIT removed a comment on pull request #10677: KAFKA-12380 Executor in Connect's Worker is not shut down when the worker is

2021-05-25 Thread GitBox


DuongPTIT removed a comment on pull request #10677:
URL: https://github.com/apache/kafka/pull/10677#issuecomment-845827053


   @chia7712 PTAL, thanks!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #10280: KAFKA-12554: Refactor Log layer

2021-05-25 Thread GitBox


junrao commented on a change in pull request #10280:
URL: https://github.com/apache/kafka/pull/10280#discussion_r638367249



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1500,50 +1325,67 @@ class Log(@volatile private var _dir: File,
   }
 }
 
-deleteOldSegments(shouldDelete, RetentionSizeBreach)
+deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
   nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
 }
 
-deleteOldSegments(shouldDelete, StartOffsetBreach)
+deleteOldSegments(shouldDelete, StartOffsetBreach(this))
   }
 
   def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix)
 
   /**
* The size of the log in bytes
*/
-  def size: Long = Log.sizeInBytes(logSegments)
+  def size: Long = localLog.segments.sizeInBytes
 
   /**
-   * The offset metadata of the next message that will be appended to the log
+   * The offset of the next message that will be appended to the log
*/
-  def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
+  def logEndOffset: Long =  localLog.logEndOffset
 
   /**
-   * The offset of the next message that will be appended to the log
+   * The offset metadata of the next message that will be appended to the log
*/
-  def logEndOffset: Long = nextOffsetMetadata.messageOffset
+  def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata
+
+  private val rollAction = RollAction(

Review comment:
   I feel RollAction actually makes the code harder to understand than 
before. So, it would be useful to think through if we could avoid it. In 
particular, it seems that anything in postRollAction could just be done in the 
caller if we return enough context. We are taking a producer snapshot in 
preRollAction. However, since we are not adding new data here. It seems that we 
could take producer snapshot in Log.roll() after calling localLog.roll() while 
holding the Log.lock.

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1500,50 +1325,67 @@ class Log(@volatile private var _dir: File,
   }
 }
 
-deleteOldSegments(shouldDelete, RetentionSizeBreach)
+deleteOldSegments(shouldDelete, RetentionSizeBreach(this))
   }
 
   private def deleteLogStartOffsetBreachedSegments(): Int = {
 def shouldDelete(segment: LogSegment, nextSegmentOpt: Option[LogSegment]): 
Boolean = {
   nextSegmentOpt.exists(_.baseOffset <= logStartOffset)
 }
 
-deleteOldSegments(shouldDelete, StartOffsetBreach)
+deleteOldSegments(shouldDelete, StartOffsetBreach(this))
   }
 
   def isFuture: Boolean = dir.getName.endsWith(Log.FutureDirSuffix)
 
   /**
* The size of the log in bytes
*/
-  def size: Long = Log.sizeInBytes(logSegments)
+  def size: Long = localLog.segments.sizeInBytes
 
   /**
-   * The offset metadata of the next message that will be appended to the log
+   * The offset of the next message that will be appended to the log
*/
-  def logEndOffsetMetadata: LogOffsetMetadata = nextOffsetMetadata
+  def logEndOffset: Long =  localLog.logEndOffset
 
   /**
-   * The offset of the next message that will be appended to the log
+   * The offset metadata of the next message that will be appended to the log
*/
-  def logEndOffset: Long = nextOffsetMetadata.messageOffset
+  def logEndOffsetMetadata: LogOffsetMetadata = localLog.logEndOffsetMetadata
+
+  private val rollAction = RollAction(
+preRollAction = (newSegment: LogSegment) => {
+  // Take a snapshot of the producer state to facilitate recovery. It is 
useful to have the snapshot
+  // offset align with the new segment offset since this ensures we can 
recover the segment by beginning
+  // with the corresponding snapshot file and scanning the segment data. 
Because the segment base offset
+  // may actually be ahead of the current producer state end offset (which 
corresponds to the log end offset),
+  // we manually override the state offset here prior to taking the 
snapshot.
+  producerStateManager.updateMapEndOffset(newSegment.baseOffset)
+  producerStateManager.takeSnapshot()
+},
+postRollAction = (newSegment: LogSegment, deletedSegment: 
Option[LogSegment]) => {
+  deletedSegment.foreach(segment => 
deleteProducerSnapshotAsync(Seq(segment)))

Review comment:
   This seems to have exposed an existing bug. During roll, deletedSegment 
will be non-empty if there is an existing segment of 0 size with the 
newOffsetToRoll. However, since we take a producer snapshot on newOffsetToRoll 
before calling postRollAction, we will be deleting the same snapshot we just 
created.
   
   In this case, I think we don't need to delete producerSnapshot for 
deletedSegment.

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -1572,144 

[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2021-05-25 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351377#comment-17351377
 ] 

A. Sophie Blee-Goldman commented on KAFKA-9168:
---

There's nothing available at the moment in AK, I actually just filed 
https://issues.apache.org/jira/browse/KAFKA-12848 for this. If you'd be 
interested in putting together a starter kit of jmh benchmarks that would 
definitely be great – but we do have an internal benchmarking repo so I can 
just run those on your patch if you'd prefer. We'd like to make some of it 
public eventually but it requires some footwork and maybe an audit so no one 
has found the time yet

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: perfomance
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12848) Add some basic benchmarks for Kafka Streams

2021-05-25 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12848:
--

 Summary: Add some basic benchmarks for Kafka Streams
 Key: KAFKA-12848
 URL: https://issues.apache.org/jira/browse/KAFKA-12848
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


As the title suggests, we often want to test out improvements or verify that a 
bugfix does not introduce a serious regression. While there are existing 
benchmarks that are run for quality assurance by various contributors, there 
are no publicly available benchmarks for Kafka Streams in AK itself.

It would be great if we had a simple jmh suite (or something) with various 
Streams features which could be run on a one-off basis by developers.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ccding opened a new pull request #10763: [WIP] KAFKA-12520: Ensure log loading does not truncate producer state unless required

2021-05-25 Thread GitBox


ccding opened a new pull request #10763:
URL: https://github.com/apache/kafka/pull/10763


   When we find a .swap file on startup, we typically want to rename and 
replace it as .log, .index, .timeindex, etc. as a way to complete any ongoing 
replace operations. These swap files are usually known to have been flushed to 
disk before the replace operation begins.
   
   One flaw in the current logic is that we recover these swap files on startup 
and as part of that, end up truncating the producer state and rebuild it from 
scratch. This is unneeded as the replace operation does not mutate the producer 
state by itself. It is only meant to replace the .log file along with 
corresponding indices. Because of this unneeded producer state rebuild 
operation, we have seen multi-hour startup times for clusters that have large 
compacted topics.
   
   This patch fixes the issue. With ext4 ordered mode, the metadata are ordered 
and no matter it is a clean/unclean shutdown, if at least one `.swap` file 
exists for a segment, all other files for the segment must exist as `.cleaned` 
files or `.swap` files. Therefore, we rename the `.cleaned` files to `.swap` 
files, then make them normal segment files.. If they don't pass the sanity 
check, we fall back to the original path and repair all the index files.
   
   TODO: run validate the patch
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12845) Rollback change which requires join key to be non null on KStream->GlobalKTable

2021-05-25 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351308#comment-17351308
 ] 

Matthias J. Sax commented on KAFKA-12845:
-

I think you confuse "input record key" with "join-key". As you correctly 
mentioned, we apply a key-extractor to get the join-key for stream side 
records. This join-key must still be non-null. K10277 only allows that the 
input record key is null now. Of course, if you use the input record key as 
join-key, the key-extractor would return the key as-is, and thus, if it's null 
the record would still be dropped.

The issue K10277 addresses is, that if you have a null-key input record, but 
you key-extractor returns a non-null join-key (from the value of the record), 
the join should still work. However, before K10277 all records with null-key 
were dropped, even if the key was not used in the join.

Does this make sense?

> Rollback change which requires join key to be non null on 
> KStream->GlobalKTable
> ---
>
> Key: KAFKA-12845
> URL: https://issues.apache.org/jira/browse/KAFKA-12845
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Pedro Gontijo
>Priority: Major
>
> As part of [KAFKA-10277|https://issues.apache.org/jira/browse/KAFKA-10277] 
> the behavior for KStream->GlobalKtable joins was changed to require non null 
> join keys.
> But it seems reasonable that not every record will have an existing 
> relationship (and hence a key) with the join globalktable. Think about a 
> User>Car for instance, or PageView>Product. An empty/zero key could be 
> returned by the KeyMapper but that will make a totally unnecessary search 
> into the store.
> I do not think that makes sense for any GlobalKtable join (inner or left) but 
> for left join it sounds even more strange.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jlprat commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL

2021-05-25 Thread GitBox


jlprat commented on pull request #10758:
URL: https://github.com/apache/kafka/pull/10758#issuecomment-848155893


   Sure @ableegoldman , no worries. I just mentioned you as you were the one 
who created the ticket.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Reopened] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-05-25 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman reopened KAFKA-9295:
---

Guess there is still something else going on here yet. At this point I think we 
can mostly rule out fiddling with the configs but I don't have any guesses on 
where to look next. It would be nice if we could get real logs from a run that 
reproduced this, but unfortunately all the actual Streams content is truncated.

[~showuon] maybe you can look into turning the zookeeper and kafka logs down to 
WARN or even ERROR so that we have some hope of viewing the relevant parts of 
the logs? I tried to do that a while back but clearly it didn't work, and I 
didn't have time to revisit it

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL

2021-05-25 Thread GitBox


ableegoldman commented on pull request #10758:
URL: https://github.com/apache/kafka/pull/10758#issuecomment-848103904


   To be honest I don't have much context on the javadocs but I will take a 
look. Maybe @ijuma can help review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2021-05-25 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351243#comment-17351243
 ] 

Sagar Rao commented on KAFKA-9168:
--

Got it.. How do you want me to benchmark? Using the rocksdb bechmarking utility 
or is there any other way like jmh within kafka streams?

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: perfomance
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2021-05-25 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351238#comment-17351238
 ] 

A. Sophie Blee-Goldman commented on KAFKA-9168:
---

Yep, I think that's exactly what we want to do with this ticket. Looking 
forward to the results

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: perfomance
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8295) Optimize count() using RocksDB merge operator

2021-05-25 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351232#comment-17351232
 ] 

Sagar Rao commented on KAFKA-8295:
--

hey [~ableegoldman], wanted to know if you got a chance to look at these 
numbers I posted above? Plz let me know whenever you get the chance to do so.

> Optimize count() using RocksDB merge operator
> -
>
> Key: KAFKA-8295
> URL: https://issues.apache.org/jira/browse/KAFKA-8295
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Sagar Rao
>Priority: Major
>
> In addition to regular put/get/delete RocksDB provides a fourth operation, 
> merge. This essentially provides an optimized read/update/write path in a 
> single operation. One of the built-in (C++) merge operators exposed over the 
> Java API is a counter. We should be able to leverage this for a more 
> efficient implementation of count()
>  
> (Note: Unfortunately it seems unlikely we can use this to optimize general 
> aggregations, even if RocksJava allowed for a custom merge operator, unless 
> we provide a way for the user to specify and connect a C++ implemented 
> aggregator – otherwise we incur too much cost crossing the jni for a net 
> performance benefit)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2021-05-25 Thread Sagar Rao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351230#comment-17351230
 ] 

Sagar Rao commented on KAFKA-9168:
--

[~ableegoldman], so I went through the github PR link- which honestly i hadn't 
done so far :D - and looks like it enables JNI direct byte buffer for all basic 
operations barring transactions. There is a code snippet explaining put 
operation using this new option:

 
{code:java}
// code placeholder

try (RocksDB db = RocksDB.open(opt, "PerformanceTest");
WriteOptions writeOptions = new WriteOptions()) {

writeOptions.setDisableWAL(true);

ByteBuffer directKeyBuffer = ByteBuffer.allocateDirect(128);
directKeyBuffer.order(ByteOrder.BIG_ENDIAN);

ByteBuffer directValueBuffer = ByteBuffer.allocateDirect(128);
directValueBuffer.order(ByteOrder.BIG_ENDIAN);

for (int i = 0; i < 1_000_000; i++) {
directKeyBuffer.clear();
directValueBuffer.clear();
for (int o = 0; o < 16; o++) {
directKeyBuffer.putLong(i);
directValueBuffer.putLong(i);
}
directKeyBuffer.flip();
directValueBuffer.flip();

db.put(writeOptions, directKeyBuffer, directValueBuffer);
}

{code}
As per the benchmarks released on the link, the iterator performace increased 
by 37% with 0 GC cycles compared to 293 for the byte array based approach. This 
is because there is no referenced memory.

So, looking at these numbers, and to answer to your question about where this 
ticket fits in, maybe we can start with some benchmarking the APIs by changing 
the way rocksdb state store apis are implemented using this new way? What I 
mean is, today if put() API implementation uses the byte[] based APIs, then we 
can benchmark using the ByteBuffer based approach and compare the numbers. Do 
you think that makes sense?

 

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Sagar Rao
>Assignee: Sagar Rao
>Priority: Major
>  Labels: perfomance
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12845) Rollback change which requires join key to be non null on KStream->GlobalKTable

2021-05-25 Thread Pedro Gontijo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351227#comment-17351227
 ] 

Pedro Gontijo edited comment on KAFKA-12845 at 5/25/21, 5:26 PM:
-

[~mjsax] and [~JoelWee] , I would love to hear your thoughts on this.


was (Author: pedrong):
[~mjsax] and [~JoelWee] would love to hear your thoughts on this.

> Rollback change which requires join key to be non null on 
> KStream->GlobalKTable
> ---
>
> Key: KAFKA-12845
> URL: https://issues.apache.org/jira/browse/KAFKA-12845
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Pedro Gontijo
>Priority: Major
>
> As part of [KAFKA-10277|https://issues.apache.org/jira/browse/KAFKA-10277] 
> the behavior for KStream->GlobalKtable joins was changed to require non null 
> join keys.
> But it seems reasonable that not every record will have an existing 
> relationship (and hence a key) with the join globalktable. Think about a 
> User>Car for instance, or PageView>Product. An empty/zero key could be 
> returned by the KeyMapper but that will make a totally unnecessary search 
> into the store.
> I do not think that makes sense for any GlobalKtable join (inner or left) but 
> for left join it sounds even more strange.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12845) Rollback change which requires join key to be non null on KStream->GlobalKTable

2021-05-25 Thread Pedro Gontijo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351227#comment-17351227
 ] 

Pedro Gontijo edited comment on KAFKA-12845 at 5/25/21, 5:26 PM:
-

[~mjsax] and [~JoelWee] would love to hear your thoughts on this.


was (Author: pedrong):
[~mjsax] and [~JoelWee] it would be great o hear your thoughts on this.

> Rollback change which requires join key to be non null on 
> KStream->GlobalKTable
> ---
>
> Key: KAFKA-12845
> URL: https://issues.apache.org/jira/browse/KAFKA-12845
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Pedro Gontijo
>Priority: Major
>
> As part of [KAFKA-10277|https://issues.apache.org/jira/browse/KAFKA-10277] 
> the behavior for KStream->GlobalKtable joins was changed to require non null 
> join keys.
> But it seems reasonable that not every record will have an existing 
> relationship (and hence a key) with the join globalktable. Think about a 
> User>Car for instance, or PageView>Product. An empty/zero key could be 
> returned by the KeyMapper but that will make a totally unnecessary search 
> into the store.
> I do not think that makes sense for any GlobalKtable join (inner or left) but 
> for left join it sounds even more strange.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12845) Rollback change which requires join key to be non null on KStream->GlobalKTable

2021-05-25 Thread Pedro Gontijo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12845?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351227#comment-17351227
 ] 

Pedro Gontijo commented on KAFKA-12845:
---

[~mjsax] and [~JoelWee] it would be great o hear your thoughts on this.

> Rollback change which requires join key to be non null on 
> KStream->GlobalKTable
> ---
>
> Key: KAFKA-12845
> URL: https://issues.apache.org/jira/browse/KAFKA-12845
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.7.0
>Reporter: Pedro Gontijo
>Priority: Major
>
> As part of [KAFKA-10277|https://issues.apache.org/jira/browse/KAFKA-10277] 
> the behavior for KStream->GlobalKtable joins was changed to require non null 
> join keys.
> But it seems reasonable that not every record will have an existing 
> relationship (and hence a key) with the join globalktable. Think about a 
> User>Car for instance, or PageView>Product. An empty/zero key could be 
> returned by the KeyMapper but that will make a totally unnecessary search 
> into the store.
> I do not think that makes sense for any GlobalKtable join (inner or left) but 
> for left join it sounds even more strange.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jlprat commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL

2021-05-25 Thread GitBox


jlprat commented on pull request #10758:
URL: https://github.com/apache/kafka/pull/10758#issuecomment-847997038


   Test failures were known flaky tests. Added a comment on the corresponding 
Jira's


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9295) KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable

2021-05-25 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9295?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351166#comment-17351166
 ] 

Josep Prat commented on KAFKA-9295:
---

Seen it in 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10758/1/testReport/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/Build___JDK_15_and_Scala_2_13___shouldInnerJoinMultiPartitionQueryable/]

 

> KTableKTableForeignKeyInnerJoinMultiIntegrationTest#shouldInnerJoinMultiPartitionQueryable
> --
>
> Key: KAFKA-9295
> URL: https://issues.apache.org/jira/browse/KAFKA-9295
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 2.4.0, 2.6.0
>Reporter: Matthias J. Sax
>Assignee: Luke Chen
>Priority: Critical
>  Labels: flaky-test
> Fix For: 3.0.0
>
>
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/27106/testReport/junit/org.apache.kafka.streams.integration/KTableKTableForeignKeyInnerJoinMultiIntegrationTest/shouldInnerJoinMultiPartitionQueryable/]
> {quote}java.lang.AssertionError: Did not receive all 1 records from topic 
> output- within 6 ms Expected: is a value equal to or greater than <1> 
> but: <0> was less than <1> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:18) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:515)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:417)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:385)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:511)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:489)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:200)
>  at 
> org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:183){quote}
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-12511) Flaky test DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()

2021-05-25 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351159#comment-17351159
 ] 

Josep Prat edited comment on KAFKA-12511 at 5/25/21, 4:00 PM:
--

Seen in: 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10758/1/testReport/kafka.network/DynamicConnectionQuotaTest/Build___JDK_8_and_Scala_2_12___testDynamicConnectionQuota__/]

 
{code:java}
java.net.SocketException: Broken pipe (Write failed)
at java.net.SocketOutputStream.socketWrite0(Native Method)
at java.net.SocketOutputStream.socketWrite(SocketOutputStream.java:111)
at java.net.SocketOutputStream.write(SocketOutputStream.java:134)
at java.io.DataOutputStream.writeInt(DataOutputStream.java:198)
at kafka.server.BaseRequestTest.sendRequest(BaseRequestTest.scala:85)
at 
kafka.server.BaseRequestTest.sendWithHeader(BaseRequestTest.scala:139)
at kafka.server.BaseRequestTest.send(BaseRequestTest.scala:134)
at 
kafka.server.BaseRequestTest.sendAndReceive(BaseRequestTest.scala:113)
at 
kafka.network.DynamicConnectionQuotaTest.verifyConnection(DynamicConnectionQuotaTest.scala:340)
at 
kafka.network.DynamicConnectionQuotaTest.createAndVerifyConnection(DynamicConnectionQuotaTest.scala:333)
at 
kafka.network.DynamicConnectionQuotaTest.testDynamicConnectionQuota(DynamicConnectionQuotaTest.scala:355){code}


was (Author: josep.prat):
Seen in: 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10758/1/testReport/kafka.network/DynamicConnectionQuotaTest/Build___JDK_8_and_Scala_2_12___testDynamicConnectionQuota__/]

 
{code:java}
java.lang.AssertionError: Did not receive all 1 records from topic output- 
within 6 ms,  currently accumulated data is []
Expected: is a value equal to or greater than <1>
 but: <0> was less than <1>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:610)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:336)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:606)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:579)
at 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:199)
at 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:185)
{code}

> Flaky test 
> DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
> --
>
> Key: KAFKA-12511
> URL: https://issues.apache.org/jira/browse/KAFKA-12511
> Project: Kafka
>  Issue Type: Bug
>Reporter: dengziming
>Priority: Minor
>
> First time:
> Listener PLAINTEXT connection rate 14.419389476913636 must be below 
> 14.399 ==> expected:  but was: 
> Second time:
> Listener EXTERNAL connection rate 10.998243336133811 must be below 
> 10.799 ==> expected:  but was: 
> details: 
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10289/4/testReport/junit/kafka.network/DynamicConnectionQuotaTest/Build___JDK_11___testDynamicListenerConnectionCreationRateQuota__/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12319) Flaky test ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()

2021-05-25 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12319?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351161#comment-17351161
 ] 

Josep Prat commented on KAFKA-12319:


Failed again in 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10758/1/testReport/kafka.network/ConnectionQuotasTest/Build___JDK_15_and_Scala_2_13___testListenerConnectionRateLimitWhenActualRateAboveLimit__/]

 

> Flaky test 
> ConnectionQuotasTest.testListenerConnectionRateLimitWhenActualRateAboveLimit()
> -
>
> Key: KAFKA-12319
> URL: https://issues.apache.org/jira/browse/KAFKA-12319
> Project: Kafka
>  Issue Type: Test
>Reporter: Justine Olshan
>Priority: Major
>  Labels: flaky-test
>
> I've seen this test fail a few times locally. But recently I saw it fail on a 
> PR build on Jenkins.
>  
> [https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10041/7/testReport/junit/kafka.network/ConnectionQuotasTest/Build___JDK_11___testListenerConnectionRateLimitWhenActualRateAboveLimit__/]
> h3. Error Message
> java.util.concurrent.ExecutionException: org.opentest4j.AssertionFailedError: 
> Expected rate (30 +- 7), but got 37.436825357209706 (600 connections / 16.027 
> sec) ==> expected: <30.0> but was: <37.436825357209706>
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12511) Flaky test DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()

2021-05-25 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12511?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351159#comment-17351159
 ] 

Josep Prat commented on KAFKA-12511:


Seen in: 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10758/1/testReport/kafka.network/DynamicConnectionQuotaTest/Build___JDK_8_and_Scala_2_12___testDynamicConnectionQuota__/]

 
{code:java}
java.lang.AssertionError: Did not receive all 1 records from topic output- 
within 6 ms,  currently accumulated data is []
Expected: is a value equal to or greater than <1>
 but: <0> was less than <1>
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:610)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:368)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:336)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:606)
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:579)
at 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.verifyKTableKTableJoin(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:199)
at 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable(KTableKTableForeignKeyInnerJoinMultiIntegrationTest.java:185)
{code}

> Flaky test 
> DynamicConnectionQuotaTest.testDynamicListenerConnectionCreationRateQuota()
> --
>
> Key: KAFKA-12511
> URL: https://issues.apache.org/jira/browse/KAFKA-12511
> Project: Kafka
>  Issue Type: Bug
>Reporter: dengziming
>Priority: Minor
>
> First time:
> Listener PLAINTEXT connection rate 14.419389476913636 must be below 
> 14.399 ==> expected:  but was: 
> Second time:
> Listener EXTERNAL connection rate 10.998243336133811 must be below 
> 10.799 ==> expected:  but was: 
> details: 
> https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10289/4/testReport/junit/kafka.network/DynamicConnectionQuotaTest/Build___JDK_11___testDynamicListenerConnectionCreationRateQuota__/



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9009) Flaky Test kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete

2021-05-25 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351155#comment-17351155
 ] 

Josep Prat commented on KAFKA-9009:
---

Seen in 
[https://ci-builds.apache.org/job/Kafka/job/kafka-pr/job/PR-10758/1/testReport/kafka.integration/MetricsDuringTopicCreationDeletionTest/Build___JDK_11_and_Scala_2_13___testMetricsDuringTopicCreateDelete__/]

 
{code:java}
Error Message
java.lang.AssertionError: assertion failed: UnderReplicatedPartitionCount not 
0: 1
Stacktrace
java.lang.AssertionError: assertion failed: UnderReplicatedPartitionCount not 
0: 1
at scala.Predef$.assert(Predef.scala:279)
at 
kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete(MetricsDuringTopicCreationDeletionTest.scala:121){code}

> Flaky Test 
> kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete
> --
>
> Key: KAFKA-9009
> URL: https://issues.apache.org/jira/browse/KAFKA-9009
> Project: Kafka
>  Issue Type: Test
>  Components: core
>Affects Versions: 2.5.0, 2.6.0
>Reporter: Bill Bejeck
>Assignee: Luke Chen
>Priority: Major
>  Labels: flaky-test
>
> Failure seen in 
> [https://builds.apache.org/job/kafka-pr-jdk8-scala2.11/25644/testReport/junit/kafka.integration/MetricsDuringTopicCreationDeletionTest/testMetricsDuringTopicCreateDelete/]
>  
> {noformat}
> Error Messagejava.lang.AssertionError: assertion failed: 
> UnderReplicatedPartitionCount not 0: 1Stacktracejava.lang.AssertionError: 
> assertion failed: UnderReplicatedPartitionCount not 0: 1
>   at scala.Predef$.assert(Predef.scala:170)
>   at 
> kafka.integration.MetricsDuringTopicCreationDeletionTest.testMetricsDuringTopicCreateDelete(MetricsDuringTopicCreationDeletionTest.scala:123)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:412)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 

[GitHub] [kafka] dajac commented on a change in pull request #10757: MINOR: Log more information when producer snapshot is written

2021-05-25 Thread GitBox


dajac commented on a change in pull request #10757:
URL: https://github.com/apache/kafka/pull/10757#discussion_r638898378



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2011,7 +2011,11 @@ object Log extends Logging {
   logDirFailureChannel,
   config.messageFormatVersion.recordVersion,
   s"[Log partition=$topicPartition, dir=${dir.getParent}] ")
-val producerStateManager = new ProducerStateManager(topicPartition, dir, 
maxProducerIdExpirationMs)
+val producerStateManager = new ProducerStateManager(
+  topicPartition,
+  dir,
+  maxProducerIdExpirationMs,
+  time)

Review comment:
   Reverted to using one line.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12430) emit.heartbeats.enabled = false should disable heartbeats topic creation

2021-05-25 Thread Ryanne Dolan (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351104#comment-17351104
 ] 

Ryanne Dolan commented on KAFKA-12430:
--

Hmm. I guess the downside to not creating the topics is that upstreamClusters 
etc won't work. They don't depend on actual records, just the topics. I don't 
have any objections but it's a consideration.

> emit.heartbeats.enabled = false should disable heartbeats topic creation
> 
>
> Key: KAFKA-12430
> URL: https://issues.apache.org/jira/browse/KAFKA-12430
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Ivan Yurchenko
>Assignee: Matthew de Detrich
>Priority: Minor
>
> Currently, whether MirrorMaker 2's {{MirrorHeartbeatConnector}} emits 
> heartbeats or not is based on {{emit.heartbeats.enabled}} setting. However, 
> {{heartbeats}} topic is created unconditionally. It seems that the same 
> setting should really disable the topic creation as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on a change in pull request #10757: MINOR: Log more information when producer snapshot is written

2021-05-25 Thread GitBox


ijuma commented on a change in pull request #10757:
URL: https://github.com/apache/kafka/pull/10757#discussion_r638837477



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2011,7 +2011,11 @@ object Log extends Logging {
   logDirFailureChannel,
   config.messageFormatVersion.recordVersion,
   s"[Log partition=$topicPartition, dir=${dir.getParent}] ")
-val producerStateManager = new ProducerStateManager(topicPartition, dir, 
maxProducerIdExpirationMs)
+val producerStateManager = new ProducerStateManager(
+  topicPartition,
+  dir,
+  maxProducerIdExpirationMs,
+  time)

Review comment:
   I prefer one line in this case, but I'm fine either way. :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12430) emit.heartbeats.enabled = false should disable heartbeats topic creation

2021-05-25 Thread Matthew de Detrich (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12430?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthew de Detrich reassigned KAFKA-12430:
--

Assignee: Matthew de Detrich

> emit.heartbeats.enabled = false should disable heartbeats topic creation
> 
>
> Key: KAFKA-12430
> URL: https://issues.apache.org/jira/browse/KAFKA-12430
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Ivan Yurchenko
>Assignee: Matthew de Detrich
>Priority: Minor
>
> Currently, whether MirrorMaker 2's {{MirrorHeartbeatConnector}} emits 
> heartbeats or not is based on {{emit.heartbeats.enabled}} setting. However, 
> {{heartbeats}} topic is created unconditionally. It seems that the same 
> setting should really disable the topic creation as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12430) emit.heartbeats.enabled = false should disable heartbeats topic creation

2021-05-25 Thread Matthew de Detrich (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12430?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351093#comment-17351093
 ] 

Matthew de Detrich commented on KAFKA-12430:


[~ryannedolan] I am going to look into this, do you have any comments/context 
to add on this topic (i.e. is there some deliberate reason why if 
emit.heartbeats.enabled is false then the heartbeat topics are still created?)

> emit.heartbeats.enabled = false should disable heartbeats topic creation
> 
>
> Key: KAFKA-12430
> URL: https://issues.apache.org/jira/browse/KAFKA-12430
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Reporter: Ivan Yurchenko
>Priority: Minor
>
> Currently, whether MirrorMaker 2's {{MirrorHeartbeatConnector}} emits 
> heartbeats or not is based on {{emit.heartbeats.enabled}} setting. However, 
> {{heartbeats}} topic is created unconditionally. It seems that the same 
> setting should really disable the topic creation as well.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ryannedolan commented on pull request #10762: KAFKA-12819: Add assert messages to MirrorMaker tests plus other qual…

2021-05-25 Thread GitBox


ryannedolan commented on pull request #10762:
URL: https://github.com/apache/kafka/pull/10762#issuecomment-847906780


   thx lgtm!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mdedetrich opened a new pull request #10762: KAFKA-12819: Add assert messages to MirrorMaker tests plus other qual…

2021-05-25 Thread GitBox


mdedetrich opened a new pull request #10762:
URL: https://github.com/apache/kafka/pull/10762


   This PR does various QoL improvements for the MM tests, mainly some basic 
refactoring to remove some boilerplate as well as adding messages to all of the 
assert statements so that if they fail there is more context
   
   Some improvements to the assert failure messages may need to be done to make 
it more clear whats actually going on, @ryannedolan you may have more to add on 
this point.
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #10757: MINOR: Log more information when producer snapshot is written

2021-05-25 Thread GitBox


dajac commented on a change in pull request #10757:
URL: https://github.com/apache/kafka/pull/10757#discussion_r638806738



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2011,7 +2011,11 @@ object Log extends Logging {
   logDirFailureChannel,
   config.messageFormatVersion.recordVersion,
   s"[Log partition=$topicPartition, dir=${dir.getParent}] ")
-val producerStateManager = new ProducerStateManager(topicPartition, dir, 
maxProducerIdExpirationMs)
+val producerStateManager = new ProducerStateManager(
+  topicPartition,
+  dir,
+  maxProducerIdExpirationMs,
+  time)

Review comment:
   Both are equivalent for me. I found that the line was getting too long 
with the addition of `time` so I broke it down that way to stay inline with 
`LogLoader.load` below. Would you prefer to keep it on one line instead?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

2021-05-25 Thread GitBox


dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-847886290


   Hi @guozhangwang @vvcephei,
   Could you have a look now? :smiley: 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #10428: KAFKA-12572: Add import ordering checkstyle rule and configure an automatic formatter

2021-05-25 Thread GitBox


dongjinleekr commented on pull request #10428:
URL: https://github.com/apache/kafka/pull/10428#issuecomment-847885818


   Rebased onto the latest trunk. cc/ @cadonna 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #10757: MINOR: Log more information when producer snapshot is written

2021-05-25 Thread GitBox


ijuma commented on a change in pull request #10757:
URL: https://github.com/apache/kafka/pull/10757#discussion_r638803061



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -2011,7 +2011,11 @@ object Log extends Logging {
   logDirFailureChannel,
   config.messageFormatVersion.recordVersion,
   s"[Log partition=$topicPartition, dir=${dir.getParent}] ")
-val producerStateManager = new ProducerStateManager(topicPartition, dir, 
maxProducerIdExpirationMs)
+val producerStateManager = new ProducerStateManager(
+  topicPartition,
+  dir,
+  maxProducerIdExpirationMs,
+  time)

Review comment:
   Nit: is this really easier to read than just adding the `time` parameter 
in the same line?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-05-25 Thread GitBox


dajac commented on pull request #10760:
URL: https://github.com/apache/kafka/pull/10760#issuecomment-847883470


   I will review it in the next few days.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #10761: MINOR: Don't ignore deletion of partition metadata file and log topic id clean-ups

2021-05-25 Thread GitBox


ijuma commented on pull request #10761:
URL: https://github.com/apache/kafka/pull/10761#issuecomment-847881296


   cc @jolshan 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #10761: MINOR: Don't ignore deletion of partition metadata file and log topic id clean-ups

2021-05-25 Thread GitBox


ijuma opened a new pull request #10761:
URL: https://github.com/apache/kafka/pull/10761


   Log if deletion fails and don't expose log topic id for mutability outside 
of `assignTopicId()`.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] thomaskwscott opened a new pull request #10760: KAFKA-12541 Extend ListOffset to fetch offset with max timestamp

2021-05-25 Thread GitBox


thomaskwscott opened a new pull request #10760:
URL: https://github.com/apache/kafka/pull/10760


   See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-734%3A+Improve+AdminClient.listOffsets+to+return+timestamp+and+offset+for+the+record+with+the+largest+timestamp
   
   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   Tested with new Integration test
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #10759: MINOR: Adjust parameter ordering of `waitForCondition` and `retryOnExceptionWithTimeout`

2021-05-25 Thread GitBox


ijuma opened a new pull request #10759:
URL: https://github.com/apache/kafka/pull/10759


   New parameters in overloaded methods should appear later apart from
   lambdas that should always be last.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12800) Configure jackson to to reject trailing input in the generator

2021-05-25 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot resolved KAFKA-12800.
-
Fix Version/s: 3.0.0
   Resolution: Fixed

Author: Nathan Lincoln

> Configure jackson to to reject trailing input in the generator
> --
>
> Key: KAFKA-12800
> URL: https://issues.apache.org/jira/browse/KAFKA-12800
> Project: Kafka
>  Issue Type: Task
>  Components: generator
>Reporter: Nathan Lincoln
>Priority: Minor
> Fix For: 3.0.0
>
>
> The ObjectMapper instance that parses the schema JSONs will successfully 
> parse, even if there is trailing input at the end of the file. This the 
> default behavior on Jackson, but JSON parsers in other languages may reject 
> these files. 
> The only instance of this should have been fixed with KAFKA-12794, and 
> configuring jackson to reject this in the future is simple - just enable 
> [FAIL_ON_TRAILING_TOKENS|https://fasterxml.github.io/jackson-databind/javadoc/2.9/com/fasterxml/jackson/databind/DeserializationFeature.html#FAIL_ON_TRAILING_TOKENS]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12800) Configure jackson to to reject trailing input in the generator

2021-05-25 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-12800:
---

Assignee: (was: David Jacot)

> Configure jackson to to reject trailing input in the generator
> --
>
> Key: KAFKA-12800
> URL: https://issues.apache.org/jira/browse/KAFKA-12800
> Project: Kafka
>  Issue Type: Task
>  Components: generator
>Reporter: Nathan Lincoln
>Priority: Minor
>
> The ObjectMapper instance that parses the schema JSONs will successfully 
> parse, even if there is trailing input at the end of the file. This the 
> default behavior on Jackson, but JSON parsers in other languages may reject 
> these files. 
> The only instance of this should have been fixed with KAFKA-12794, and 
> configuring jackson to reject this in the future is simple - just enable 
> [FAIL_ON_TRAILING_TOKENS|https://fasterxml.github.io/jackson-databind/javadoc/2.9/com/fasterxml/jackson/databind/DeserializationFeature.html#FAIL_ON_TRAILING_TOKENS]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12800) Configure jackson to to reject trailing input in the generator

2021-05-25 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-12800:
---

Assignee: David Jacot

> Configure jackson to to reject trailing input in the generator
> --
>
> Key: KAFKA-12800
> URL: https://issues.apache.org/jira/browse/KAFKA-12800
> Project: Kafka
>  Issue Type: Task
>  Components: generator
>Reporter: Nathan Lincoln
>Assignee: David Jacot
>Priority: Minor
>
> The ObjectMapper instance that parses the schema JSONs will successfully 
> parse, even if there is trailing input at the end of the file. This the 
> default behavior on Jackson, but JSON parsers in other languages may reject 
> these files. 
> The only instance of this should have been fixed with KAFKA-12794, and 
> configuring jackson to reject this in the future is simple - just enable 
> [FAIL_ON_TRAILING_TOKENS|https://fasterxml.github.io/jackson-databind/javadoc/2.9/com/fasterxml/jackson/databind/DeserializationFeature.html#FAIL_ON_TRAILING_TOKENS]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac merged pull request #10717: KAFKA-12800: Configure generator to fail on trailing JSON tokens

2021-05-25 Thread GitBox


dajac merged pull request #10717:
URL: https://github.com/apache/kafka/pull/10717


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #10616: KAFKA-12709; Add Admin API for `ListTransactions`

2021-05-25 Thread GitBox


dajac commented on a change in pull request #10616:
URL: https://github.com/apache/kafka/pull/10616#discussion_r638764893



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategy.java
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.clients.admin.internals;
+
+import org.apache.kafka.common.internals.KafkaFutureImpl;
+import org.apache.kafka.common.message.MetadataRequestData;
+import org.apache.kafka.common.message.MetadataResponseData;
+import org.apache.kafka.common.requests.AbstractResponse;
+import org.apache.kafka.common.requests.MetadataRequest;
+import org.apache.kafka.common.requests.MetadataResponse;
+import org.apache.kafka.common.utils.LogContext;
+import org.slf4j.Logger;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.OptionalInt;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * This class is used for use cases which require requests to be sent to all
+ * brokers in the cluster.
+ *
+ * This is a slightly degenerate case of a lookup strategy in the sense that
+ * the broker IDs are used as both the keys and values. Also, unlike
+ * {@link CoordinatorStrategy} and {@link PartitionLeaderStrategy}, we do not
+ * know the set of keys ahead of time: we require the initial lookup in order
+ * to discover what the broker IDs are. This is represented with a more complex
+ * type {@code Future>} in the admin API result type.
+ * For example, see {@link 
org.apache.kafka.clients.admin.ListTransactionsResult}.
+ */
+public class AllBrokersStrategy implements 
AdminApiLookupStrategy {
+public static final BrokerKey ANY_BROKER = new 
BrokerKey(OptionalInt.empty());
+public static final Set LOOKUP_KEYS = 
Collections.singleton(ANY_BROKER);
+private static final ApiRequestScope SINGLE_REQUEST_SCOPE = new 
ApiRequestScope() {
+};
+
+private final Logger log;
+
+public AllBrokersStrategy(
+LogContext logContext
+) {
+this.log = logContext.logger(AllBrokersStrategy.class);
+}
+
+@Override
+public ApiRequestScope lookupScope(BrokerKey key) {
+return SINGLE_REQUEST_SCOPE;
+}
+
+@Override
+public MetadataRequest.Builder buildRequest(Set keys) {
+validateLookupKeys(keys);
+// Send empty `Metadata` request. We are only interested in the 
brokers from the response
+return new MetadataRequest.Builder(new MetadataRequestData());
+}
+
+@Override
+public LookupResult handleResponse(Set keys, 
AbstractResponse abstractResponse) {
+validateLookupKeys(keys);
+
+MetadataResponse response = (MetadataResponse) abstractResponse;
+MetadataResponseData.MetadataResponseBrokerCollection brokers = 
response.data().brokers();
+
+if (brokers.isEmpty()) {
+log.debug("Metadata response contained no brokers. Will backoff 
and retry");
+return LookupResult.empty();
+} else {
+log.debug("Discovered all brokers {} to send requests to", 
brokers);
+}
+
+Map brokerKeys = 
brokers.stream().collect(Collectors.toMap(
+broker -> new BrokerKey(OptionalInt.of(broker.nodeId())),
+MetadataResponseData.MetadataResponseBroker::nodeId
+));
+
+return new LookupResult<>(
+Collections.singletonList(ANY_BROKER),
+Collections.emptyMap(),
+brokerKeys
+);
+}
+
+private void validateLookupKeys(Set keys) {
+if (keys.size() != 1) {
+throw new IllegalArgumentException("Unexpected key set " + keys);

Review comment:
   nit: I would add a `:` after `set` to stay consistent with the other 
error messages in this PR.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/internals/AllBrokersStrategy.java
##
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this 

[GitHub] [kafka] jlprat commented on pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL

2021-05-25 Thread GitBox


jlprat commented on pull request #10758:
URL: https://github.com/apache/kafka/pull/10758#issuecomment-847839591


   cc @ableegoldman as you opened the Jira ticket, maybe you'd like to review 
this one?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jlprat opened a new pull request #10758: KAFKA-12782: Javadocs search sends you to a non-existent URL

2021-05-25 Thread GitBox


jlprat opened a new pull request #10758:
URL: https://github.com/apache/kafka/pull/10758


   This is just a workaround to solve this problem while we are still using
   JDK11.
   Once moving to, presumably, JDK17 this change won't be needed
   anymore and could be deleted safely. See 
https://bugs.openjdk.java.net/browse/JDK-8215291
   
   This change includes a snippet of code copied from JDK 12+
   
   I'm not sure if an extra header needs to be added for the piece of code I 
copied over, or if I would need to implement it from scratch.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-25 Thread GitBox


showuon commented on pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#issuecomment-847807953


   @vahidhashemian , thanks for the comments. I've updated. Please take a look 
again. Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-25 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r638718719



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -637,19 +709,27 @@ private void assignPartition(TopicPartition partition,
 }
 }
 
-private boolean canParticipateInReassignment(TopicPartition partition,
- Map> partition2AllPotentialConsumers) {
+private boolean canParticipateInReassignment(String topic,
+ Map> 
topic2AllPotentialConsumers) {
 // if a partition has two or more potential consumers it is subject to 
reassignment.
-return partition2AllPotentialConsumers.get(partition).size() >= 2;
+return topic2AllPotentialConsumers.get(topic).size() >= 2;
 }
 
 private boolean canParticipateInReassignment(String consumer,
  Map> currentAssignment,
- Map> consumer2AllPotentialPartitions,
- Map> partition2AllPotentialConsumers) {
+ Map> 
consumer2AllPotentialTopics,
+ Map> 
topic2AllPotentialConsumers,
+ Map 
partitionsPerTopic,
+ int totalPartitionCount) {
 List currentPartitions = 
currentAssignment.get(consumer);
 int currentAssignmentSize = currentPartitions.size();
-int maxAssignmentSize = 
consumer2AllPotentialPartitions.get(consumer).size();
+List allSubscribedTopics = 
consumer2AllPotentialTopics.get(consumer);
+int maxAssignmentSize;
+if (allSubscribedTopics.size() == partitionsPerTopic.size()) {
+maxAssignmentSize = totalPartitionCount;
+} else {
+maxAssignmentSize = allSubscribedTopics.stream().map(topic -> 
partitionsPerTopic.get(topic)).reduce(0, Integer::sum);
+}

Review comment:
   Good suggestion! Updated. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #10757: MINOR: Log more information when producer snapshot is written

2021-05-25 Thread GitBox


dajac opened a new pull request #10757:
URL: https://github.com/apache/kafka/pull/10757


   This patch logs more information when a producer snapshot is written to the 
disk.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12782) Javadocs search sends you to a non-existent URL

2021-05-25 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351001#comment-17351001
 ] 

Josep Prat commented on KAFKA-12782:


I can confirm this is bug with Java11 for API docs that do not use the modules 
system. It has been solved on JDK 12 and higher but not yet backported to JDK 
11. See [https://bugs.openjdk.java.net/browse/JDK-8215291] for more details. 
Long story short, the search.js file appends the module name in the URL.

I ran the docs script manually specifying JDK 16 and the links on the search 
are generated correctly pointing to the right URL.

I will try to find a workaround that would make it work in the meantime. It is 
possible to disable the module system option for the API docs, however all 
links pointing to JDK APIs (i.e. String) won't work as they would need the 
submodule prefix.

> Javadocs search sends you to a non-existent URL
> ---
>
> Key: KAFKA-12782
> URL: https://issues.apache.org/jira/browse/KAFKA-12782
> Project: Kafka
>  Issue Type: Bug
>  Components: docs
>Reporter: A. Sophie Blee-Goldman
>Assignee: Josep Prat
>Priority: Major
>
> I was looking up a class using the javadocs search functionality, and clicked 
> on the link when TaskId came up, but it sent me to which 
> [https://kafka.apache.org/28/javadoc/undefined/org/apache/kafka/streams/processor/TaskId.html]
>  does not exist.
> I noticed the URL had an odd "undefined" term inserted before the package 
> name, so I took that out and was able to find the [correct 
> javadocs|https://kafka.apache.org/28/javadoc/org/apache/kafka/streams/processor/TaskId.html].
>  So the search seems to be broken due to this "undefined" term that's being 
> injected somewhere, for some reason.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12743) [Kafka Streams] - cluster failover for stateful Kafka Streams applications

2021-05-25 Thread Sergey Zyrianov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17351000#comment-17351000
 ] 

Sergey Zyrianov commented on KAFKA-12743:
-

I don't think uReplicator alternative is relevant. It does not provide offset 
tracking b/w clusters. Even if topic name is unchanged - state store has to 
figure out offset in another cluster. mm2 changes topic names for a reason. I 
don't know how capable Confluent's replicator is.

> [Kafka Streams] - cluster failover for stateful Kafka Streams applications
> --
>
> Key: KAFKA-12743
> URL: https://issues.apache.org/jira/browse/KAFKA-12743
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker, streams
>Affects Versions: 2.8.0
>Reporter: Sergey Zyrianov
>Priority: Major
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name : 
> _app_id-storename-changelog_
> {noformat}
> public static String storeChangelogTopic(String applicationId, String 
> storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
>  
> MirrorMaker2(mm2) copies these topics to remote cluster under the name  
> _src-cluster-alias.app_id-storename-changelog_
>  
> When streams app fails over to the remote cluster it has troubles to find 
> changelog topic of its state store since it was renamed - given source 
> cluster prefix by mm2.
> Whats the fix should be ? instruct mm2 to keep topic name or subscribe to 
> regex *._app_id-storename-changelog_ topic name for the state's changelog.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12782) Javadocs search sends you to a non-existent URL

2021-05-25 Thread Josep Prat (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12782?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Josep Prat reassigned KAFKA-12782:
--

Assignee: Josep Prat

> Javadocs search sends you to a non-existent URL
> ---
>
> Key: KAFKA-12782
> URL: https://issues.apache.org/jira/browse/KAFKA-12782
> Project: Kafka
>  Issue Type: Bug
>  Components: docs
>Reporter: A. Sophie Blee-Goldman
>Assignee: Josep Prat
>Priority: Major
>
> I was looking up a class using the javadocs search functionality, and clicked 
> on the link when TaskId came up, but it sent me to which 
> [https://kafka.apache.org/28/javadoc/undefined/org/apache/kafka/streams/processor/TaskId.html]
>  does not exist.
> I noticed the URL had an odd "undefined" term inserted before the package 
> name, so I took that out and was able to find the [correct 
> javadocs|https://kafka.apache.org/28/javadoc/org/apache/kafka/streams/processor/TaskId.html].
>  So the search seems to be broken due to this "undefined" term that's being 
> injected somewhere, for some reason.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12847) Dockerfile needed for kafka system tests needs changes

2021-05-25 Thread Abhijit Mane (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijit Mane updated KAFKA-12847:
-
Description: 
Hello,

I tried apache/kafka system tests as per documentation: -

([https://github.com/apache/kafka/tree/trunk/tests#readme|https://github.com/apache/kafka/tree/trunk/tests#readme_])

=
 PROBLEM
 ~~

1.) As root user, clone kafka github repo and start "kafka system tests"
 # git clone [https://github.com/apache/kafka.git]
 # cd kafka
 # ./gradlew clean systemTestLibs
 # bash tests/docker/run_tests.sh

2.) Dockerfile issue - 
[https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile]

This file has an *UID* entry as shown below: -
 ---
 ARG *UID*="1000"
 RUN useradd -u $*UID* ducker

// {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not 
unique, root user id is 0
 ---
 I ran everything as root which means the built-in bash environment variable 
'UID' always

resolves to 0 and can't be changed. Hence, the docker build fails. The issue 
should be seen even if run as non-root.

3.) Next, as root, as per README, I ran: -

server:/kafka> *bash tests/docker/run_tests.sh*

The ducker tool builds the container images & switches to user '*ducker*' 
inside the container

& maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container.

Ref: 
[https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak|https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak]

Ex:  docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* 

This fails as the 'ducker' user has *no write permissions* to create files 
under 'kafka' root dir. Hence, it needs to be made writeable.

// *chmod -R a+w kafka* 
 – needed as container is run as 'ducker' and needs write access since kafka 
root volume from host is mapped to container as "/opt/kafka-dev" where the 
'ducker' user writes logs
 =

=
 *FIXES needed*
 ~
 1.) Dockerfile - 
[https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile]
 Change 'UID' to '*UID_DUCKER*'.

This won't conflict with built in bash env. var UID and the docker image build 
should succeed.
 ---
 ARG *UID_DUCKER*="1000"
 RUN useradd -u $*UID_DUCKER* ducker

// *{color:#57d9a3}No Error{color}* => No conflict with built-in UID
 ---

2.) README needs an update where we must ensure the kafka root dir from where 
the tests 
 are launched is writeable to allow the 'ducker' user to create results/logs.
 # chmod -R a+w kafka

With this, I was able to get the docker images built and system tests started 
successfully.
 =

Also, I wonder whether or not upstream Dockerfile & System tests are part of 
CI/CD and get tested for every PR. If so, this issue should have been caught.

 

*Question to kafka SME*
 -
 Do you believe this is a valid problem with the Dockerfile and the fix is 
acceptable? 
 Please let me know and I am happy to submit a PR with this fix.

Thanks,
 Abhijit

  was:
Hello,

I tried apache/kafka system tests as per documentation: -

([https://github.com/apache/kafka/tree/trunk/tests#readme|https://github.com/apache/kafka/tree/trunk/tests#readme_])

=
 PROBLEM
 ~~

1.) As root user, clone kafka github repo and start "kafka system tests"
 # git clone [https://github.com/apache/kafka.git]
 # cd kafka
 # ./gradlew clean systemTestLibs
 # bash tests/docker/run_tests.sh

2.) Dockerfile issue - 
[https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile]

This file has an *UID* entry as shown below: -
 ---
 ARG *UID*="1000"
 RUN useradd -u $*UID* ducker

// {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not 
unique, root user id is 0
 ---
 I ran everything as root which means the built-in bash environment variable 
'UID' always

resolves to 0 and can't be changed. Hence, the docker build fails.

3.) Next, as root, as per README, I ran: -

server:/kafka> *bash tests/docker/run_tests.sh*

The ducker tool builds the container images & switches to user '*ducker*' 
inside the container

& maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container.

Ref: 
[https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak|https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak]

Ex:  docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* 

This fails as the 'ducker' user has *no write permissions* to create files 
under 'kafka' root dir. Hence, it needs to be made writeable.

// *chmod -R a+w kafka* 
 – needed as container is run as 'ducker' and needs write access since kafka 
root volume from host is mapped to container as 

[jira] [Updated] (KAFKA-12847) Dockerfile needed for kafka system tests needs changes

2021-05-25 Thread Abhijit Mane (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijit Mane updated KAFKA-12847:
-
Description: 
Hello,

I tried apache/kafka system tests as per documentation: -

([https://github.com/apache/kafka/tree/trunk/tests#readme|https://github.com/apache/kafka/tree/trunk/tests#readme_])

=
 PROBLEM
 ~~

1.) As root user, clone kafka github repo and start "kafka system tests"
 # git clone [https://github.com/apache/kafka.git]
 # cd kafka
 # ./gradlew clean systemTestLibs
 # bash tests/docker/run_tests.sh

2.) Dockerfile issue - 
[https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile]

This file has an *UID* entry as shown below: -
 ---
 ARG *UID*="1000"
 RUN useradd -u $*UID* ducker

// {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not 
unique, root user id is 0
 ---
 I ran everything as root which means the built-in bash environment variable 
'UID' always

resolves to 0 and can't be changed. Hence, the docker build fails.

3.) Next, as root, as per README, I ran: -

server:/kafka> *bash tests/docker/run_tests.sh*

The ducker tool builds the container images & switches to user '*ducker*' 
inside the container

& maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container.

Ref: 
[https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak|https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak]

Ex:  docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* 

This fails as the 'ducker' user has *no write permissions* to create files 
under 'kafka' root dir. Hence, it needs to be made writeable.

// *chmod -R a+w kafka* 
 – needed as container is run as 'ducker' and needs write access since kafka 
root volume from host is mapped to container as "/opt/kafka-dev" where the 
'ducker' user writes logs
 =

=
 *FIXES needed*
 ~
 1.) Dockerfile - 
[https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile]
 Change 'UID' to '*UID_DUCKER*'.

This won't conflict with built in bash env. var UID and the docker image build 
should succeed.
 ---
 ARG *UID_DUCKER*="1000"
 RUN useradd -u $*UID_DUCKER* ducker

// *{color:#57d9a3}No Error{color}* => No conflict with built-in UID
 ---

2.) README needs an update where we must ensure the kafka root dir from where 
the tests 
 are launched is writeable to allow the 'ducker' user to create results/logs.
 # chmod -R a+w kafka

With this, I was able to get the docker images built and system tests started 
successfully.
 =

Also, I wonder whether or not upstream Dockerfile & System tests are part of 
CI/CD and get tested for every PR. If so, this issue should have been caught.

 

*Question to kafka SME*
 -
 Do you believe this is a valid problem with the Dockerfile and the fix is 
acceptable? 
 Please let me know and I am happy to submit a PR with this fix.

Thanks,
 Abhijit

  was:
Hello,

I tried apache/kafka system tests as per documentation: -

([https://github.com/apache/kafka/tree/trunk/tests#readme|https://github.com/apache/kafka/tree/trunk/tests#readme_])

=
 PROBLEM
 ~~

1.) As root user, clone kafka github repo and start "kafka system tests"
 # git clone [https://github.com/apache/kafka.git]
 # cd kafka
 # ./gradlew clean systemTestLibs
 # bash tests/docker/run_tests.sh

2.) Dockerfile issue - 
[https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile]


 This file has an *UID* entry as shown below: -
 ---
 ARG *UID*="1000"
 RUN useradd -u $*UID* ducker

// {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not 
unique, root user id is 0
 ---
 I ran everything as root which means the built-in bash environment variable 
'UID' always

resolves to 0 and can't be changed. Hence, the docker build fails.

3.) Next, as root, as per README, I ran: -

server:/kafka> *bash tests/docker/run_tests.sh*

The ducker tool builds the container images & switches to user '*ducker*' 
inside the container

& maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container.

Ref: 
[https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak|https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak*]

Ex:  docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* 

This fails as the 'ducker' user has *no write permissions* to create files 
under 'kafka' root dir. Hence, it needs to be made writeable.

// *chmod -R a+w kafka* 
 – needed as container is run as 'ducker' and needs write access since kafka 
root volume from host is mapped to container as "/opt/kafka-dev" where the 
'ducker' user writes logs
 

[jira] [Updated] (KAFKA-12847) Dockerfile needed for kafka system tests needs changes

2021-05-25 Thread Abhijit Mane (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijit Mane updated KAFKA-12847:
-
Description: 
Hello,

I tried apache/kafka system tests as per documentation: -

([https://github.com/apache/kafka/tree/trunk/tests#readme|https://github.com/apache/kafka/tree/trunk/tests#readme_])

=
 PROBLEM
 ~~

1.) As root user, clone kafka github repo and start "kafka system tests"
 # git clone [https://github.com/apache/kafka.git]
 # cd kafka
 # ./gradlew clean systemTestLibs
 # bash tests/docker/run_tests.sh

2.) Dockerfile issue - 
[https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile]


 This file has an *UID* entry as shown below: -
 ---
 ARG *UID*="1000"
 RUN useradd -u $*UID* ducker

// {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not 
unique, root user id is 0
 ---
 I ran everything as root which means the built-in bash environment variable 
'UID' always

resolves to 0 and can't be changed. Hence, the docker build fails.

3.) Next, as root, as per README, I ran: -

server:/kafka> *bash tests/docker/run_tests.sh*

The ducker tool builds the container images & switches to user '*ducker*' 
inside the container

& maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container.

Ref: 
[https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak|https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak*]

Ex:  docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* 

This fails as the 'ducker' user has *no write permissions* to create files 
under 'kafka' root dir. Hence, it needs to be made writeable.

// *chmod -R a+w kafka* 
 – needed as container is run as 'ducker' and needs write access since kafka 
root volume from host is mapped to container as "/opt/kafka-dev" where the 
'ducker' user writes logs
 =

=
 *FIXES needed*
 ~
 1.) Dockerfile - 
[https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile]
 Change 'UID' to '*UID_DUCKER*'.

This won't conflict with built in bash env. var UID and the docker image build 
should succeed.
 ---
 ARG *UID_DUCKER*="1000"
 RUN useradd -u $*UID_DUCKER* ducker

// *{color:#57d9a3}No Error{color}* => No conflict with built-in UID
 ---

2.) README needs an update where we must ensure the kafka root dir from where 
the tests 
 are launched is writeable to allow the 'ducker' user to create results/logs.
 # chmod -R a+w kafka

With this, I was able to get the docker images built and system tests started 
successfully.
 =

Also, I wonder whether or not upstream Dockerfile & System tests are part of 
CI/CD and get tested for every PR. If so, this issue should have been caught.

 

*Question to kafka SME*
 -
 Do you believe this is a valid problem with the Dockerfile and the fix is 
acceptable? 
 Please let me know and I am happy to submit a PR with this fix.

Thanks,
 Abhijit

  was:
Hello,

I tried apache/kafka system tests as per documentation: -

([https://github.com/apache/kafka/tree/trunk/tests#readme|https://github.com/apache/kafka/tree/trunk/tests#readme_])

=
 PROBLEM
 ~~

1.) As root user, clone kafka github repo and start "kafka system tests"
 # git clone [https://github.com/apache/kafka.git]
 # cd kafka
 # ./gradlew clean systemTestLibs
 # bash tests/docker/run_tests.sh

2.) Dockerfile issue - 
[https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile|https://github.com/apache/kafka/blob/trunk/tests/docker/*Dockerfile*_]
 This file has an *UID* entry as shown below: -
 ---
 ARG *UID*="1000"
 RUN useradd -u $*UID* ducker

// {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not 
unique, root user id is 0
 ---
 I ran everything as root which means the built-in bash environment variable 
'UID' always

resolves to 0 and can't be changed. Hence, the docker build fails.

3.) Next, as root, as per README, I ran: -

server:/kafka> *bash tests/docker/run_tests.sh*

The ducker tool builds the container images & switches to user '*ducker*' 
inside the container

& maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container.

Ref: 
[https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak|https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak*]

Ex:  docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* 

This fails as the 'ducker' user has *no write permissions* to create files 
under 'kafka' root dir. Hence, it needs to be made writeable.

// *chmod -R a+w kafka* 
 – needed as container is run as 'ducker' and needs write access since kafka 
root volume from host is mapped to 

[jira] [Updated] (KAFKA-12847) Dockerfile needed for kafka system tests needs changes

2021-05-25 Thread Abhijit Mane (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijit Mane updated KAFKA-12847:
-
Description: 
Hello,

I tried apache/kafka system tests as per documentation: -

(_[https://github.com/apache/kafka/tree/trunk/tests#readme_])

=
 PROBLEM
 ~~

1.) As root user, clone kafka github repo and start "kafka system tests"
 # git clone [https://github.com/apache/kafka.git]
 # cd kafka
 # ./gradlew clean systemTestLibs
 # bash tests/docker/run_tests.sh

2.) Dockerfile issue - 
[https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile|https://github.com/apache/kafka/blob/trunk/tests/docker/*Dockerfile*_]
 This file has an *UID* entry as shown below: -
 ---
 ARG *UID*="1000"
 RUN useradd -u $*UID* ducker

// {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not 
unique, root user id is 0
 ---
 I ran everything as root which means the built-in bash environment variable 
'UID' always

resolves to 0 and can't be changed. Hence, the docker build fails.

3.) Next, as root, as per README, I ran: -

server:/kafka> *bash tests/docker/run_tests.sh*

The ducker tool builds the container images & switches to user '*ducker*' 
inside the container

& maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container.

Ref: *[https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak*]

Ex:  docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* 

This fails as the 'ducker' user has *no write permissions* to create files 
under 'kafka' root dir. Hence, it needs to be made writeable.

// *chmod -R a+w kafka* 
 – needed as container is run as 'ducker' and needs write access since kafka 
root volume from host is mapped to container as "/opt/kafka-dev" where the 
'ducker' user writes logs
 =

=
 *FIXES needed*
 ~
 1.) Dockerfile - 
[https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile]
 Change 'UID' to '*UID_DUCKER*'.

This won't conflict with built in bash env. var UID and the docker image build 
should succeed.
 ---
 ARG *UID_DUCKER*="1000"
 RUN useradd -u $*UID_DUCKER* ducker

// *{color:#57d9a3}No Error{color}* => No conflict with built-in UID
 ---

2.) README needs an update where we must ensure the kafka root dir from where 
the tests 
 are launched is writeable to allow the 'ducker' user to create results/logs.
 # chmod -R a+w kafka

With this, I was able to get the docker images built and system tests started 
successfully.
 =

Also, I wonder whether or not upstream Dockerfile & System tests are part of 
CI/CD and get tested for every PR. If so, this issue should have been caught.

 

*Question to kafka SME*
 -
 Do you believe this is a valid problem with the Dockerfile and the fix is 
acceptable? 
 Please let me know and I am happy to submit a PR with this fix.

Thanks,
 Abhijit

  was:
Hello,

I tried apache/kafka system tests as per documentation: -

(_https://github.com/apache/kafka/tree/trunk/tests#readme_)

=
PROBLEM
~~

1.) As root user, clone kafka github repo and start "kafka system tests"
 # git clone https://github.com/apache/kafka.git
 # cd kafka
 # ./gradlew clean systemTestLibs
 # bash tests/docker/run_tests.sh 
 
2.) Dockerfile issue - 
_https://github.com/apache/kafka/blob/trunk/tests/docker/*Dockerfile*_
 This file has an *UID* entry as shown below: -
---
ARG *UID*="1000"
RUN useradd -u $*UID* ducker

// {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not 
unique, root user id is 0
---
 I ran everything as root which means the built-in bash environment variable 
'UID' always

resolves to 0 and can't be changed. Hence, the docker build fails.
 
3.) Next, as root, as per README, I ran: -

server:/kafka> *bash tests/docker/run_tests.sh*

The ducker tool builds the container images & switches to user '*ducker*' 
inside the container

& maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container.

Ref: *https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak*

Ex:  docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* 

This fails as the 'ducker' user has *no write permissions* to create files 
under 'kafka' root dir. Hence, it needs to be made writeable.

// *chmod -R a+w kafka* 
 -- needed as container is run as 'ducker' and needs write access since kafka 
root volume from host is mapped to container as "/opt/kafka-dev" where the 
'ducker' user writes logs
=


 
=
*FIXES needed*
~
 1.) Dockerfile - 

[jira] [Updated] (KAFKA-12847) Dockerfile needed for kafka system tests needs changes

2021-05-25 Thread Abhijit Mane (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12847?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Abhijit Mane updated KAFKA-12847:
-
Description: 
Hello,

I tried apache/kafka system tests as per documentation: -

([https://github.com/apache/kafka/tree/trunk/tests#readme|https://github.com/apache/kafka/tree/trunk/tests#readme_])

=
 PROBLEM
 ~~

1.) As root user, clone kafka github repo and start "kafka system tests"
 # git clone [https://github.com/apache/kafka.git]
 # cd kafka
 # ./gradlew clean systemTestLibs
 # bash tests/docker/run_tests.sh

2.) Dockerfile issue - 
[https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile|https://github.com/apache/kafka/blob/trunk/tests/docker/*Dockerfile*_]
 This file has an *UID* entry as shown below: -
 ---
 ARG *UID*="1000"
 RUN useradd -u $*UID* ducker

// {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not 
unique, root user id is 0
 ---
 I ran everything as root which means the built-in bash environment variable 
'UID' always

resolves to 0 and can't be changed. Hence, the docker build fails.

3.) Next, as root, as per README, I ran: -

server:/kafka> *bash tests/docker/run_tests.sh*

The ducker tool builds the container images & switches to user '*ducker*' 
inside the container

& maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container.

Ref: 
[https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak|https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak*]

Ex:  docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* 

This fails as the 'ducker' user has *no write permissions* to create files 
under 'kafka' root dir. Hence, it needs to be made writeable.

// *chmod -R a+w kafka* 
 – needed as container is run as 'ducker' and needs write access since kafka 
root volume from host is mapped to container as "/opt/kafka-dev" where the 
'ducker' user writes logs
 =

=
 *FIXES needed*
 ~
 1.) Dockerfile - 
[https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile]
 Change 'UID' to '*UID_DUCKER*'.

This won't conflict with built in bash env. var UID and the docker image build 
should succeed.
 ---
 ARG *UID_DUCKER*="1000"
 RUN useradd -u $*UID_DUCKER* ducker

// *{color:#57d9a3}No Error{color}* => No conflict with built-in UID
 ---

2.) README needs an update where we must ensure the kafka root dir from where 
the tests 
 are launched is writeable to allow the 'ducker' user to create results/logs.
 # chmod -R a+w kafka

With this, I was able to get the docker images built and system tests started 
successfully.
 =

Also, I wonder whether or not upstream Dockerfile & System tests are part of 
CI/CD and get tested for every PR. If so, this issue should have been caught.

 

*Question to kafka SME*
 -
 Do you believe this is a valid problem with the Dockerfile and the fix is 
acceptable? 
 Please let me know and I am happy to submit a PR with this fix.

Thanks,
 Abhijit

  was:
Hello,

I tried apache/kafka system tests as per documentation: -

(_[https://github.com/apache/kafka/tree/trunk/tests#readme_])

=
 PROBLEM
 ~~

1.) As root user, clone kafka github repo and start "kafka system tests"
 # git clone [https://github.com/apache/kafka.git]
 # cd kafka
 # ./gradlew clean systemTestLibs
 # bash tests/docker/run_tests.sh

2.) Dockerfile issue - 
[https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile|https://github.com/apache/kafka/blob/trunk/tests/docker/*Dockerfile*_]
 This file has an *UID* entry as shown below: -
 ---
 ARG *UID*="1000"
 RUN useradd -u $*UID* ducker

// {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not 
unique, root user id is 0
 ---
 I ran everything as root which means the built-in bash environment variable 
'UID' always

resolves to 0 and can't be changed. Hence, the docker build fails.

3.) Next, as root, as per README, I ran: -

server:/kafka> *bash tests/docker/run_tests.sh*

The ducker tool builds the container images & switches to user '*ducker*' 
inside the container

& maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container.

Ref: *[https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak*]

Ex:  docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* 

This fails as the 'ducker' user has *no write permissions* to create files 
under 'kafka' root dir. Hence, it needs to be made writeable.

// *chmod -R a+w kafka* 
 – needed as container is run as 'ducker' and needs write access since kafka 
root volume from host is mapped to container as "/opt/kafka-dev" where the 
'ducker' 

[jira] [Updated] (KAFKA-10846) FileStreamSourceTask buffer can grow without bound

2021-05-25 Thread Tom Bentley (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10846?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley updated KAFKA-10846:

Fix Version/s: 2.7.2

> FileStreamSourceTask buffer can grow without bound
> --
>
> Key: KAFKA-10846
> URL: https://issues.apache.org/jira/browse/KAFKA-10846
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Reporter: Tom Bentley
>Assignee: Tom Bentley
>Priority: Major
> Fix For: 2.8.0, 2.7.2
>
>
> When reading a large file the buffer used by {{FileStreamSourceTask}} can 
> grow without bound. Even in the unit test 
> org.apache.kafka.connect.file.FileStreamSourceTaskTest#testBatchSize the 
> buffer grows from 1,024 to 524,288 bytes just reading 10,000 copies of a line 
> of <100 chars.
> The problem is that the condition for growing the buffer is incorrect. The 
> buffer is doubled whenever some bytes were read and the used space in the 
> buffer == the buffer length.
> The requirement to increase the buffer size should be related to whether 
> {{extractLine()}} actually managed to read any lines. It's only when no 
> complete lines were read since the last call to {{read()}} that we need to 
> increase the buffer size (to cope with the large line).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] tombentley commented on pull request #10750: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka

2021-05-25 Thread GitBox


tombentley commented on pull request #10750:
URL: https://github.com/apache/kafka/pull/10750#issuecomment-847768625


   It's unlikely that 2.5 or 2.6 will see another release, but I've backported 
it to the 2.7 branch so it's in any 2.7.2 which gets released.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12847) Dockerfile needed for kafka system tests needs changes

2021-05-25 Thread Abhijit Mane (Jira)
Abhijit Mane created KAFKA-12847:


 Summary: Dockerfile needed for kafka system tests needs changes
 Key: KAFKA-12847
 URL: https://issues.apache.org/jira/browse/KAFKA-12847
 Project: Kafka
  Issue Type: Bug
  Components: system tests
Affects Versions: 2.7.1, 2.8.0
 Environment: Issue tested in environments below but is independent of 
h/w arch. or Linux flavor: -
1.) RHEL-8.3 on x86_64 
2.) RHEL-8.3 on IBM Power (ppc64le)
3.) apache/kafka branch tested: trunk (master)
Reporter: Abhijit Mane
 Attachments: Dockerfile.upstream

Hello,

I tried apache/kafka system tests as per documentation: -

(_https://github.com/apache/kafka/tree/trunk/tests#readme_)

=
PROBLEM
~~

1.) As root user, clone kafka github repo and start "kafka system tests"
 # git clone https://github.com/apache/kafka.git
 # cd kafka
 # ./gradlew clean systemTestLibs
 # bash tests/docker/run_tests.sh 
 
2.) Dockerfile issue - 
_https://github.com/apache/kafka/blob/trunk/tests/docker/*Dockerfile*_
 This file has an *UID* entry as shown below: -
---
ARG *UID*="1000"
RUN useradd -u $*UID* ducker

// {color:#de350b}*Error during docker build*{color} => useradd: UID 0 is not 
unique, root user id is 0
---
 I ran everything as root which means the built-in bash environment variable 
'UID' always

resolves to 0 and can't be changed. Hence, the docker build fails.
 
3.) Next, as root, as per README, I ran: -

server:/kafka> *bash tests/docker/run_tests.sh*

The ducker tool builds the container images & switches to user '*ducker*' 
inside the container

& maps kafka root dir ('kafka') from host to '/opt/kafka-dev' in the container.

Ref: *https://github.com/apache/kafka/blob/trunk/tests/docker/ducker-ak*

Ex:  docker run -d *-v "${kafka_dir}:/opt/kafka-dev"* 

This fails as the 'ducker' user has *no write permissions* to create files 
under 'kafka' root dir. Hence, it needs to be made writeable.

// *chmod -R a+w kafka* 
 -- needed as container is run as 'ducker' and needs write access since kafka 
root volume from host is mapped to container as "/opt/kafka-dev" where the 
'ducker' user writes logs
=


 
=
*FIXES needed*
~
 1.) Dockerfile - 
https://github.com/apache/kafka/blob/trunk/tests/docker/Dockerfile
 Change 'UID' to '*UID_DUCKER*'.

This won't conflict with built in bash env. var UID and the docker image build 
should succeed.
---
ARG *UID_DUCKER*="1000"
RUN useradd -u $*UID_DUCKER* ducker

// *{color:#57d9a3}No Error{color}* => No conflict with built-in UID
---

2.) README needs an update where we must ensure the kafka root dir from where 
the tests 
 are launched is writeable to allow the 'ducker' user to create results/logs.
 # chmod -R a+w kafka

With this, I was able to get the docker images built and system tests started 
successfully.
=
 
Also, I wonder whether or not upstream Dockerfile & System tests are part of 
CI/CD and get tested for every PR. If so, this issue should have been caught.

 

*Question to kafka SME*
-
Do you believe this is a valid problem with the Dockerfile and the fix is 
acceptable? 
Please let me know and I am happy to submit a PR with this fix.

Thanks,
Abhijit



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on pull request #10377: KAFKA-12515 ApiVersionManager should create response based on request version

2021-05-25 Thread GitBox


rajinisivaram commented on pull request #10377:
URL: https://github.com/apache/kafka/pull/10377#issuecomment-847762942


   @feyman2016 We should add `ignorable=true` for the fields added for feature 
support in 
`clients/src/main/resources/common/message/ApiVersionsResponse.json`. This 
would avoid having to pass the version around and set feature fields only if it 
is supported for the request/response version.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test

2021-05-25 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350960#comment-17350960
 ] 

Bruno Cadonna edited comment on KAFKA-5676 at 5/25/21, 10:42 AM:
-

[~marcolotz] Thank you for looking into this!
The classes that use {{StreamsMetricsImpl}} should be only internal classes, 
like {{ClientMetrics}} that contains {{addStateMetrics()}}. They are allowed to 
use the implementation instead of the interface. The interface 
{{StreamsMetrics}} should be the one that is exposed in the public API.
Currently, I do not recall why we need to move {{MockStreamsMetrics}} in the 
public API. 
IMO, we should get rid of {{MockStreamsMetrics}} and replace its usages with an 
EasyMock mock. However, I do not know whether this is straight forward.
See also KAFKA-8977.



was (Author: cadonna):
[~marcolotz] Thank you for looking into this!
The classes that use {{StreamsMetricsImpl}} should be only internal classes, 
like {{ClientMetrics}} that contains {{addStateMetrics()}}. They are allowed to 
use the implementation instead of the interface. The interface 
{{StreamsMetrics}} should be the one that is exposed in the public API.
Currently, I do not recall why we need to move {{MockStreamsMetrics}} in the 
public API. 
IMO, we should get rid of {{MockStreamsMetrics}} and replace its usages with an 
EasyMock mock. However, I do not know whether this is straight forward.


> MockStreamsMetrics should be in o.a.k.test
> --
>
> Key: KAFKA-5676
> URL: https://issues.apache.org/jira/browse/KAFKA-5676
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Marco Lotz
>Priority: Major
>  Labels: newbie
>  Time Spent: 96h
>  Remaining Estimate: 0h
>
> {{MockStreamsMetrics}}'s package should be `o.a.k.test` not 
> `o.a.k.streams.processor.internals`. 
> In addition, it should not require a {{Metrics}} parameter in its constructor 
> as it is only needed for its extended base class; the right way of mocking 
> should be implementing {{StreamsMetrics}} with mock behavior than extended a 
> real implementaion of {{StreamsMetricsImpl}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5676) MockStreamsMetrics should be in o.a.k.test

2021-05-25 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5676?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350960#comment-17350960
 ] 

Bruno Cadonna commented on KAFKA-5676:
--

[~marcolotz] Thank you for looking into this!
The classes that use {{StreamsMetricsImpl}} should be only internal classes, 
like {{ClientMetrics}} that contains {{addStateMetrics()}}. They are allowed to 
use the implementation instead of the interface. The interface 
{{StreamsMetrics}} should be the one that is exposed in the public API.
Currently, I do not recall why we need to move {{MockStreamsMetrics}} in the 
public API. 
IMO, we should get rid of {{MockStreamsMetrics}} and replace its usages with an 
EasyMock mock. However, I do not know whether this is straight forward.


> MockStreamsMetrics should be in o.a.k.test
> --
>
> Key: KAFKA-5676
> URL: https://issues.apache.org/jira/browse/KAFKA-5676
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Marco Lotz
>Priority: Major
>  Labels: newbie
>  Time Spent: 96h
>  Remaining Estimate: 0h
>
> {{MockStreamsMetrics}}'s package should be `o.a.k.test` not 
> `o.a.k.streams.processor.internals`. 
> In addition, it should not require a {{Metrics}} parameter in its constructor 
> as it is only needed for its extended base class; the right way of mocking 
> should be implementing {{StreamsMetrics}} with mock behavior than extended a 
> real implementaion of {{StreamsMetricsImpl}}.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] DuongPTIT commented on pull request #10750: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka

2021-05-25 Thread GitBox


DuongPTIT commented on pull request #10750:
URL: https://github.com/apache/kafka/pull/10750#issuecomment-847756554


   > Probably fixed by #9735
   
   I've seen that this issue still happen in V2.5, V2.6, V2.7. What about these 
versions? Do they need to fix this similar to V2.8 and trunk? 
   Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #10701: KAFKA-10437; Fix omitted TODO of KIP-478

2021-05-25 Thread GitBox


dengziming commented on pull request #10701:
URL: https://github.com/apache/kafka/pull/10701#issuecomment-847753120


   @mjsax @guozhangwang , how do you think about this approach? In fact, I'm 
not very confident about this change since it isn't very elegant, but this can 
remove the TODO, feel free to point my fault.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-25 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r638655936



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -637,19 +709,27 @@ private void assignPartition(TopicPartition partition,
 }
 }
 
-private boolean canParticipateInReassignment(TopicPartition partition,
- Map> partition2AllPotentialConsumers) {
+private boolean canParticipateInReassignment(String topic,
+ Map> 
topic2AllPotentialConsumers) {
 // if a partition has two or more potential consumers it is subject to 
reassignment.

Review comment:
   Good catch! Updated.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #10750: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka

2021-05-25 Thread GitBox


tombentley commented on pull request #10750:
URL: https://github.com/apache/kafka/pull/10750#issuecomment-847746044


   Probably fixed by https://github.com/apache/kafka/pull/9735


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-25 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r638654728



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -469,73 +426,190 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 TreeSet sortedCurrentSubscriptions = new TreeSet<>(new 
SubscriptionComparator(currentAssignment));
 sortedCurrentSubscriptions.addAll(currentAssignment.keySet());
 
-balance(currentAssignment, prevAssignment, sortedPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
-consumer2AllPotentialPartitions, partition2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired);
+balance(currentAssignment, prevAssignment, sortedAllPartitions, 
unassignedPartitions, sortedCurrentSubscriptions,
+consumer2AllPotentialTopics, topic2AllPotentialConsumers, 
currentPartitionConsumer, revocationRequired,
+partitionsPerTopic, totalPartitionsCount);
+
+if (log.isDebugEnabled()) {
+log.debug("final assignment: {}", currentAssignment);
+}
+
 return currentAssignment;
 }
 
+/**
+ * get the unassigned partition list by computing the difference set of 
the sortedPartitions(all partitions)
+ * and sortedAssignedPartitions. If no assigned partitions, we'll just 
return all sorted topic partitions.
+ * This is used in generalAssign method
+ *
+ * We loop the sortedPartition, and compare the ith element in 
sortedAssignedPartitions(i start from 0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
sortedAssignedPartitions
+ *
+ * @param sortedAllPartitions:  sorted all partitions
+ * @param sortedAssignedPartitions: sorted partitions, all are 
included in the sortedPartitions
+ * @param topic2AllPotentialConsumers:  topics mapped to all consumers 
that subscribed to it
+ * @return  partitions that aren't assigned to 
any current consumer
+ */
+private List getUnassignedPartitions(List 
sortedAllPartitions,
+ List 
sortedAssignedPartitions,
+ Map> topic2AllPotentialConsumers) {
+if (sortedAssignedPartitions.isEmpty()) {
+return sortedAllPartitions;
+}
+
+List unassignedPartitions = new ArrayList<>();
+
+Collections.sort(sortedAssignedPartitions, new 
PartitionComparator(topic2AllPotentialConsumers));
+
+boolean shouldAddDirectly = false;
+Iterator sortedAssignedPartitionsIter = 
sortedAssignedPartitions.iterator();
+TopicPartition nextAssignedPartition = 
sortedAssignedPartitionsIter.next();
+
+for (TopicPartition topicPartition : sortedAllPartitions) {
+if (shouldAddDirectly || 
!nextAssignedPartition.equals(topicPartition)) {
+unassignedPartitions.add(topicPartition);
+} else {
+// this partition is in assignedPartitions, don't add to 
unassignedPartitions, just get next assigned partition
+if (sortedAssignedPartitionsIter.hasNext()) {
+nextAssignedPartition = 
sortedAssignedPartitionsIter.next();
+} else {
+// add the remaining directly since there is no more 
sortedAssignedPartitions
+shouldAddDirectly = true;
+}
+}
+}
+return unassignedPartitions;
+}
+
+/**
+ * get the unassigned partition list by computing the difference set of 
all sorted partitions
+ * and sortedAssignedPartitions. If no assigned partitions, we'll just 
return all sorted topic partitions.
+ * This is used in constrainedAssign method
+ *
+ * To compute the difference set, we use two pointers technique here:
+ *
+ * We loop through the all sorted topics, and then iterate all partitions 
the topic has,
+ * compared with the ith element in sortedAssignedPartitions(i starts from 
0):
+ *   - if not equal to the ith element, add to unassignedPartitions
+ *   - if equal to the the ith element, get next element from 
sortedAssignedPartitions
+ *
+ * @param totalPartitionsCount  all partitions counts in this 
assignment
+ * @param partitionsPerTopicthe number of partitions for each 
subscribed topic.
+ * @param sortedAssignedPartitions  sorted partitions, all are included in 
the sortedPartitions
+ * @return  the partitions not yet assigned to any 
consumers
+ */
+private List getUnassignedPartitions(int 
totalPartitionsCount,
+ Map 
partitionsPerTopic,
+   

[GitHub] [kafka] showuon commented on a change in pull request #10552: KAFKA-12675: improve the sticky general assignor scalability and performance

2021-05-25 Thread GitBox


showuon commented on a change in pull request #10552:
URL: https://github.com/apache/kafka/pull/10552#discussion_r638651011



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -444,23 +392,32 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
 // otherwise (the consumer still exists)
 for (Iterator partitionIter = 
entry.getValue().iterator(); partitionIter.hasNext();) {
 TopicPartition partition = partitionIter.next();
-if 
(!partition2AllPotentialConsumers.containsKey(partition)) {
-// if this topic partition of this consumer no longer 
exists remove it from currentAssignment of the consumer
+if 
(!topic2AllPotentialConsumers.containsKey(partition.topic())) {
+// if this topic partition of this consumer no longer 
exists, remove it from currentAssignment of the consumer
 partitionIter.remove();
 currentPartitionConsumer.remove(partition);
-} else if 
(!subscriptions.get(entry.getKey()).topics().contains(partition.topic())) {
-// if this partition cannot remain assigned to its 
current consumer because the consumer
-// is no longer subscribed to its topic remove it from 
currentAssignment of the consumer
+} else if 
(!consumerSubscription.topics().contains(partition.topic())) {
+// because the consumer is no longer subscribed to its 
topic, remove it from currentAssignment of the consumer
 partitionIter.remove();
 revocationRequired = true;
-} else
+} else {
 // otherwise, remove the topic partition from those 
that need to be assigned only if
 // its current consumer is still subscribed to its 
topic (because it is already assigned
 // and we would want to preserve that assignment as 
much as possible)
-unassignedPartitions.remove(partition);
+assignedPartitions.add(partition);
+}
 }
 }
 }
+
+// all partitions that needed to be assigned
+List unassignedPartitions = 
getUnassignedPartitions(sortedAllPartitions, assignedPartitions, 
topic2AllPotentialConsumers);
+assignedPartitions = null;

Review comment:
   Yes, `assignedPartitions.clear()` would have the same impact, but it'll 
loop through all the arrayList and nullify them one by one. I think we can 
either `null` it, or remove this line. What do you think?
   
   ```java
   /**
* Removes all of the elements from this list.  The list will
* be empty after this call returns.
*/
   public void clear() {
   modCount++;
   final Object[] es = elementData;
   for (int to = size, i = size = 0; i < to; i++)
   es[i] = null;
   }```
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] DuongPTIT edited a comment on pull request #10750: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka

2021-05-25 Thread GitBox


DuongPTIT edited a comment on pull request #10750:
URL: https://github.com/apache/kafka/pull/10750#issuecomment-847728426


   > Please confirm if the issue only happen in V2.5 or newer release. If also 
happen in trunk, please make the merge target as `trunk` (and fix based on the 
`trunk` branch), and if only in V2.5, please raise question in dev email group, 
to ask if there's any new release plan for V2.5. Thank you.
   
   I've just reproduced this. I found out that the issue happened in V2.7 and 
older release, but it worked fine in V2.8 and trunk.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12824) Remove Deprecated method KStream#branch

2021-05-25 Thread Josep Prat (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350942#comment-17350942
 ] 

Josep Prat commented on KAFKA-12824:


On the parent issue there is this sentence:

> Each subtask will de focusing on a specific API, so it's easy to discuss if 
> it should be removed by 4.0.0 or maybe even at a later point.

 

And now I added an extra comment on the parent issue as well, stating, that 
when the time for 4.0.0 comes, we might need to re-evaluate if it has passed 
enough time or not yet for each API.

> Remove Deprecated method KStream#branch
> ---
>
> Key: KAFKA-12824
> URL: https://issues.apache.org/jira/browse/KAFKA-12824
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Josep Prat
>Priority: Blocker
> Fix For: 4.0.0
>
>
> The method branch in both Java and Scala KStream class was deprecated in 
> version 2.8:
>  * org.apache.kafka.streams.scala.kstream.KStream#branch
>  * 
> org.apache.kafka.streams.kstream.KStream#branch(org.apache.kafka.streams.kstream.Predicate  super K,? super V>...)
>  * 
> org.apache.kafka.streams.kstream.KStream#branch(org.apache.kafka.streams.kstream.Named,
>  org.apache.kafka.streams.kstream.Predicate...) 
>  
> See KAFKA-5488 and KIP-418



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12824) Remove Deprecated method KStream#branch

2021-05-25 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12824?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350941#comment-17350941
 ] 

Bruno Cadonna commented on KAFKA-12824:
---

[~mjsax] Do we know for which date AK 4.0.0 is planned? I could not find 
anything on the wiki. Since we do not know the release date, we do not know if 
the deprecation period will be enough or not. I would add a note to the 
description of this ticket that says that it is not clear whether the 
deprecation period of the subtasks is long enough for 4.0.0 and that they 
should be re-evaluated once it is clear when 4.0.0 will be released.

> Remove Deprecated method KStream#branch
> ---
>
> Key: KAFKA-12824
> URL: https://issues.apache.org/jira/browse/KAFKA-12824
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: Josep Prat
>Priority: Blocker
> Fix For: 4.0.0
>
>
> The method branch in both Java and Scala KStream class was deprecated in 
> version 2.8:
>  * org.apache.kafka.streams.scala.kstream.KStream#branch
>  * 
> org.apache.kafka.streams.kstream.KStream#branch(org.apache.kafka.streams.kstream.Predicate  super K,? super V>...)
>  * 
> org.apache.kafka.streams.kstream.KStream#branch(org.apache.kafka.streams.kstream.Named,
>  org.apache.kafka.streams.kstream.Predicate...) 
>  
> See KAFKA-5488 and KIP-418



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] DuongPTIT commented on pull request #10670: KAFKA-10273 Connect Converters should produce actionable error messages

2021-05-25 Thread GitBox


DuongPTIT commented on pull request #10670:
URL: https://github.com/apache/kafka/pull/10670#issuecomment-847730974


   hi @showuon, please take a review. Thank you so much!


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] DuongPTIT removed a comment on pull request #10670: KAFKA-10273 Connect Converters should produce actionable error messages

2021-05-25 Thread GitBox


DuongPTIT removed a comment on pull request #10670:
URL: https://github.com/apache/kafka/pull/10670#issuecomment-840464740






-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] DuongPTIT commented on pull request #10750: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka

2021-05-25 Thread GitBox


DuongPTIT commented on pull request #10750:
URL: https://github.com/apache/kafka/pull/10750#issuecomment-847728426


   > Please confirm if the issue only happen in V2.5 or newer release. If also 
happen in trunk, please make the merge target as `trunk` (and fix based on the 
`trunk` branch), and if only in V2.5, please raise question in dev email group, 
to ask if there's any new release plan for V2.5. Thank you.
   
   I've just reproduced this. I found out that the issue happened in V2.7 and 
older version, but it worked fine in V2.8 and trunk.  


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-12461) Extend LogManager to cover the metadata topic

2021-05-25 Thread loboxu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12461?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loboxu reassigned KAFKA-12461:
--

Assignee: (was: loboxu)

> Extend LogManager to cover the metadata topic
> -
>
> Key: KAFKA-12461
> URL: https://issues.apache.org/jira/browse/KAFKA-12461
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Priority: Major
>
> The `@metadata` topic is not managed by `LogManager` since it uses a new 
> snapshot-based retention policy. This means that it is not covered by the 
> recovery and high watermark checkpoints. It would be useful to fix this. We 
> can either extend `LogManager` so that it is aware of the snapshotting 
> semantics implemented by the `@metadata` topic, or we can create something 
> like a `RaftLogManager`.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10900) Add metrics enumerated in KIP-630

2021-05-25 Thread loboxu (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10900?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

loboxu reassigned KAFKA-10900:
--

Assignee: loboxu

> Add metrics enumerated in KIP-630
> -
>
> Key: KAFKA-10900
> URL: https://issues.apache.org/jira/browse/KAFKA-10900
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Assignee: loboxu
>Priority: Major
>
> KIP-630 enumerates a few metrics. Makes sure that those metrics are 
> implemented.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik closed pull request #9764: MINOR: Eliminate KafkaScheduler#scheduleOnce in favor of KafkaScheduler#schedule

2021-05-25 Thread GitBox


kowshik closed pull request #9764:
URL: https://github.com/apache/kafka/pull/9764


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12846) why need this logic in Consumer‘s Fetch logic it should remove?

2021-05-25 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12846?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17350904#comment-17350904
 ] 

Luke Chen commented on KAFKA-12846:
---

[~ws], I believe the comments in else block answers your question. I'm not sure 
what your expected result is. Are you trying to get rid of the else block? If 
so, maybe you have to prove the comment is wrong, so that we can get rid of it. 
What do you think?

> why need this logic in Consumer‘s Fetch logic   it should remove?
> -
>
> Key: KAFKA-12846
> URL: https://issues.apache.org/jira/browse/KAFKA-12846
> Project: Kafka
>  Issue Type: Wish
>Reporter: yws
>Priority: Trivial
> Fix For: 2.3.0
>
>
> package: org.apache.kafka.clients.consumer.internals
> class: Fetcher
> else {
> // this case shouldn't usually happen because we 
> only send one fetch at a time per partition,
> // but it might conceivably happen in some rare 
> cases (such as partition leader changes).
> // we have to copy to a new list because the old 
> one may be immutable
> List> newRecords = new 
> ArrayList<>(records.size() + currentRecords.size());
> newRecords.addAll(currentRecords);
> newRecords.addAll(records);
> fetched.put(partition, newRecords);
> }
> recordsRemaining -= records.size();
> }
> I just cannot think of the case that it will goes to the else logic,  who can 
> illustrate it? it's useless logic in my opinion, looking forward to reply!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12846) why need this logic in Consumer‘s Fetch logic it should remove?

2021-05-25 Thread yws (Jira)
yws created KAFKA-12846:
---

 Summary: why need this logic in Consumer‘s Fetch logic   it should 
remove?
 Key: KAFKA-12846
 URL: https://issues.apache.org/jira/browse/KAFKA-12846
 Project: Kafka
  Issue Type: Wish
Reporter: yws
 Fix For: 2.3.0


package: org.apache.kafka.clients.consumer.internals
class: Fetcher

else {
// this case shouldn't usually happen because we 
only send one fetch at a time per partition,
// but it might conceivably happen in some rare 
cases (such as partition leader changes).
// we have to copy to a new list because the old 
one may be immutable
List> newRecords = new 
ArrayList<>(records.size() + currentRecords.size());
newRecords.addAll(currentRecords);
newRecords.addAll(records);
fetched.put(partition, newRecords);
}
recordsRemaining -= records.size();
}


I just cannot think of the case that it will goes to the else logic,  who can 
illustrate it? it's useless logic in my opinion, looking forward to reply!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #10750: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka

2021-05-25 Thread GitBox


showuon commented on pull request #10750:
URL: https://github.com/apache/kafka/pull/10750#issuecomment-847654763


   @DuongPTIT , thanks for the PR. One important question to you before 
reviewing the code: could you confirm this issue only happen in V2.5, and not 
V2.8? I'm not sure if we will have another minor release for V2.5 ( I don't 
think it's highly possible because it's 4 releases ago). That means, even your 
fix merge into V2.5, you might not be able to upgrade Kafka to get this fix. 
   
   Usually we merge to trunk first, and then back port to older release. So, if 
you can confirm this issue will happen in V2.8 or `trunk` branch, it will get 
fixed in next release definitely!  Thank you.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] socutes commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException

2021-05-25 Thread GitBox


socutes commented on a change in pull request #10749:
URL: https://github.com/apache/kafka/pull/10749#discussion_r638556812



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -372,27 +371,23 @@ private void maybeFireLeaderChange() {
 
 @Override
 public void initialize() {
-try {
-quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
+quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
 
-long currentTimeMs = time.milliseconds();
-if (quorum.isLeader()) {
-throw new IllegalStateException("Voter cannot initialize as a 
Leader");
-} else if (quorum.isCandidate()) {
-onBecomeCandidate(currentTimeMs);
-} else if (quorum.isFollower()) {
-onBecomeFollower(currentTimeMs);
-}
+long currentTimeMs = time.milliseconds();
+if (quorum.isLeader()) {
+throw new IllegalStateException("Voter cannot initialize as a 
Leader");
+} else if (quorum.isCandidate()) {
+onBecomeCandidate(currentTimeMs);
+} else if (quorum.isFollower()) {
+onBecomeFollower(currentTimeMs);
+}
 
-// When there is only a single voter, become candidate immediately
-if (quorum.isVoter()
+// When there is only a single voter, become candidate immediately
+if (quorum.isVoter()
 && quorum.remoteVoters().isEmpty()
 && !quorum.isCandidate()) {

Review comment:
   Thank you very much for your precious time. I understand it and I will 
fix it

##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -116,7 +117,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) 
throws IOException,
 if (election == null) {
 election = ElectionState.withUnknownLeader(0, voters);
 }
-} catch (final IOException e) {
+} catch (final Exception e) {

Review comment:
   Yes, I seem to have




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] DuongPTIT commented on pull request #10750: KAFKA-8120 Getting NegativeArraySizeException when using Kafka Connect to send data to Kafka

2021-05-25 Thread GitBox


DuongPTIT commented on pull request #10750:
URL: https://github.com/apache/kafka/pull/10750#issuecomment-847644863


   hi @showuon, can you please review this for me?
   Many thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10749: KAFKA-12773: Use UncheckedIOException when wrapping IOException

2021-05-25 Thread GitBox


showuon commented on a change in pull request #10749:
URL: https://github.com/apache/kafka/pull/10749#discussion_r638544356



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -372,27 +371,23 @@ private void maybeFireLeaderChange() {
 
 @Override
 public void initialize() {
-try {
-quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
+quorum.initialize(new OffsetAndEpoch(log.endOffset().offset, 
log.lastFetchedEpoch()));
 
-long currentTimeMs = time.milliseconds();
-if (quorum.isLeader()) {
-throw new IllegalStateException("Voter cannot initialize as a 
Leader");
-} else if (quorum.isCandidate()) {
-onBecomeCandidate(currentTimeMs);
-} else if (quorum.isFollower()) {
-onBecomeFollower(currentTimeMs);
-}
+long currentTimeMs = time.milliseconds();
+if (quorum.isLeader()) {
+throw new IllegalStateException("Voter cannot initialize as a 
Leader");
+} else if (quorum.isCandidate()) {
+onBecomeCandidate(currentTimeMs);
+} else if (quorum.isFollower()) {
+onBecomeFollower(currentTimeMs);
+}
 
-// When there is only a single voter, become candidate immediately
-if (quorum.isVoter()
+// When there is only a single voter, become candidate immediately
+if (quorum.isVoter()
 && quorum.remoteVoters().isEmpty()
 && !quorum.isCandidate()) {

Review comment:
   No, you can see the change, you move this line `if (quorum.isVoter()` 
left (4 spaces I guess), so the following 2 lines should also move left.

##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -116,7 +117,7 @@ public void initialize(OffsetAndEpoch logEndOffsetAndEpoch) 
throws IOException,
 if (election == null) {
 election = ElectionState.withUnknownLeader(0, voters);
 }
-} catch (final IOException e) {
+} catch (final Exception e) {

Review comment:
   Yes, but I think `Exception` is covering too many "unexpected" 
exceptions. You can see the catch block, we are handling IOException case, not 
other exceptions. I think we can directly catch `UncheckedIOException` here.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >