[GitHub] [kafka] showuon commented on a change in pull request #8675: KAFKA-10004: Fix the default broker configs cannot be displayed when using kafka-configs.sh --describe

2020-05-16 Thread GitBox


showuon commented on a change in pull request #8675:
URL: https://github.com/apache/kafka/pull/8675#discussion_r426131494



##
File path: core/src/main/scala/kafka/admin/ConfigCommand.scala
##
@@ -424,7 +425,7 @@ object ConfigCommand extends Config {
 case ConfigType.Topic =>
   adminClient.listTopics(new 
ListTopicsOptions().listInternal(true)).names().get().asScala.toSeq
 case ConfigType.Broker | BrokerLoggerConfigType =>
-  adminClient.describeCluster(new 
DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ 
ConfigEntityName.Default
+  adminClient.describeCluster(new 
DescribeClusterOptions()).nodes().get().asScala.map(_.idString).toSeq :+ 
BrokerDefaultEntityName

Review comment:
   Thanks @bdbyrne . Good suggestion. I've updated it in this commit 
https://github.com/apache/kafka/pull/8675/commits/83cc2328099ef1424c3cea492156bd4a82f3fe65.
 Thank you.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #8622: MINOR: Update stream documentation

2020-05-16 Thread GitBox


showuon commented on pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#issuecomment-629607982


   Thanks @abbccdda , I'll wait for @ableegoldman 's confirmation to see if I 
have to revert this version back to `2.3`. Thank you very much.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10009) Add method for getting last record offset in kafka partition

2020-05-16 Thread Yuriy Badalyantc (Jira)
Yuriy Badalyantc created KAFKA-10009:


 Summary: Add method for getting last record offset in kafka 
partition
 Key: KAFKA-10009
 URL: https://issues.apache.org/jira/browse/KAFKA-10009
 Project: Kafka
  Issue Type: New Feature
  Components: clients, consumer
Reporter: Yuriy Badalyantc


As far as I understand, at the current moment, there is no reliable way for 
getting offset of the last record in the partition using java client. There is 
{{endOffsets}} method in the consumer. And usually {{endOffsets - 1}} works 
fine. But in the case of transactional producer, topic may contain offsets 
without a record. And {{endOffsets - 1}} will point to the offset without 
record.

This feature will help in situations when consumer application wants to consume 
the whole topic. Checking of beginning and last record offset will give lower 
and upper bounds for consuming. Of course, it is doable with the current 
consumer implementation, but I need to check {{position}} after each poll.

Also, I believe that this feature may help with monitoring and operations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10009) Add method for getting last record offset in kafka partition

2020-05-16 Thread Yuriy Badalyantc (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10009?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Yuriy Badalyantc updated KAFKA-10009:
-
Priority: Minor  (was: Major)

> Add method for getting last record offset in kafka partition
> 
>
> Key: KAFKA-10009
> URL: https://issues.apache.org/jira/browse/KAFKA-10009
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Yuriy Badalyantc
>Priority: Minor
>
> As far as I understand, at the current moment, there is no reliable way for 
> getting offset of the last record in the partition using java client. There 
> is {{endOffsets}} method in the consumer. And usually {{endOffsets - 1}} 
> works fine. But in the case of transactional producer, topic may contain 
> offsets without a record. And {{endOffsets - 1}} will point to the offset 
> without record.
> This feature will help in situations when consumer application wants to 
> consume the whole topic. Checking of beginning and last record offset will 
> give lower and upper bounds for consuming. Of course, it is doable with the 
> current consumer implementation, but I need to check {{position}} after each 
> poll.
> Also, I believe that this feature may help with monitoring and operations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8815) Kafka broker blocked on I/O primitive

2020-05-16 Thread Alexandre Dupriez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108917#comment-17108917
 ] 

Alexandre Dupriez commented on KAFKA-8815:
--

Hi [~william_reynolds]. In this case, the problem was an I/O stall. Due to 
stuck progress on the I/O block layer, the kernel freezes the file system and 
prevents any progress on `read`/`write`.
After some time, a kernel panic leads to a hard failure of every read/write.

What was your environment (kernel and block device)?

> Kafka broker blocked on I/O primitive
> -
>
> Key: KAFKA-8815
> URL: https://issues.apache.org/jira/browse/KAFKA-8815
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Affects Versions: 1.1.1
>Reporter: Alexandre Dupriez
>Priority: Major
>
> This JIRA is for tracking a problem we run into on a production cluster.
> *Scenario*
> Cluster of 15 brokers and an average ingress throughput of ~4 MB/s and egress 
> of ~4 MB/s per broker.
> Brokers are running on OpenJDK 8. They are configured with a heap size of 1 
> GB.
> There is around ~1,000 partition replicas per broker. Load is evenly 
> balanced. Each broker instance is under fair CPU load, but not overloaded 
> (50-60%). G1 is used for garbage collection and doesn't exhibit any pressure, 
> with mostly short young GC observed and an heap-after-GC usage of 70%.
> Replication factor is 3.
> *Symptom*
> One broker on the cluster suddenly became "unresponsive". Other brokers, 
> Zookeeper and producers/consumers requests were failing with timeouts. The 
> Kafka process, however, was still alive and doing some background work 
> (truncating logs and rolling segments) This lasted for hours. At some point, 
> several thread dumps were taken at few minutes interval. Most of the threads 
> were "blocked". Deadlock was ruled out. The most suspicious stack is the 
> following 
> {code:java}
> Thread 7801: (state = BLOCKED)
>  - sun.nio.ch.FileChannelImpl.write(java.nio.ByteBuffer) @bci=25, line=202 
> (Compiled frame)
>  - 
> org.apache.kafka.common.record.MemoryRecords.writeFullyTo(java.nio.channels.GatheringByteChannel)
>  @bci=24, line=93 (Compiled frame)
>  - 
> org.apache.kafka.common.record.FileRecords.append(org.apache.kafka.common.record.MemoryRecords)
>  @bci=5, line=152 (Compiled frame)
>  - kafka.log.LogSegment.append(long, long, long, long, 
> org.apache.kafka.common.record.MemoryRecords) @bci=82, line=136 (Compiled 
> frame)
>  - kafka.log.Log.$anonfun$append$2(kafka.log.Log, 
> org.apache.kafka.common.record.MemoryRecords, boolean, boolean, int, 
> java.lang.Object) @bci=1080, line=757 (Compiled frame)
>  - kafka.log.Log$$Lambda$614.apply() @bci=24 (Compiled frame)
>  - kafka.log.Log.maybeHandleIOException(scala.Function0, scala.Function0) 
> @bci=1, line=1696 (Compiled frame)
>  - kafka.log.Log.append(org.apache.kafka.common.record.MemoryRecords, 
> boolean, boolean, int) @bci=29, line=642 (Compiled frame)
>  - kafka.log.Log.appendAsLeader(org.apache.kafka.common.record.MemoryRecords, 
> int, boolean) @bci=5, line=612 (Compiled frame)
>  - 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(kafka.cluster.Partition,
>  org.apache.kafka.common.record.MemoryRecords, boolean, int) @bci=148, 
> line=609 (Compiled frame)
>  - kafka.cluster.Partition$$Lambda$837.apply() @bci=16 (Compiled frame)
>  - kafka.utils.CoreUtils$.inLock(java.util.concurrent.locks.Lock, 
> scala.Function0) @bci=7, line=250 (Compiled frame)
>  - 
> kafka.utils.CoreUtils$.inReadLock(java.util.concurrent.locks.ReadWriteLock, 
> scala.Function0) @bci=8, line=256 (Compiled frame)
>  - 
> kafka.cluster.Partition.appendRecordsToLeader(org.apache.kafka.common.record.MemoryRecords,
>  boolean, int) @bci=16, line=597 (Compiled frame)
>  - 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(kafka.server.ReplicaManager,
>  boolean, boolean, short, scala.Tuple2) @bci=295, line=739 (Compiled frame)
>  - kafka.server.ReplicaManager$$Lambda$836.apply(java.lang.Object) @bci=20 
> (Compiled frame)
>  - scala.collection.TraversableLike.$anonfun$map$1(scala.Function1, 
> scala.collection.mutable.Builder, java.lang.Object) @bci=3, line=234 
> (Compiled frame)
>  - scala.collection.TraversableLike$$Lambda$14.apply(java.lang.Object) @bci=9 
> (Compiled frame)
>  - scala.collection.mutable.HashMap.$anonfun$foreach$1(scala.Function1, 
> scala.collection.mutable.DefaultEntry) @bci=16, line=138 (Compiled frame)
>  - scala.collection.mutable.HashMap$$Lambda$31.apply(java.lang.Object) @bci=8 
> (Compiled frame)
>  - scala.collection.mutable.HashTable.foreachEntry(scala.Function1) @bci=39, 
> line=236 (Compiled frame)
>  - 
> scala.collection.mutable.HashTable.foreachEntry$(scala.collection.mutable.HashTable,
>  scala.Function1) @bci=2, line=229 (Compiled frame)
>  - scala.collection.

[jira] [Comment Edited] (KAFKA-8815) Kafka broker blocked on I/O primitive

2020-05-16 Thread Alexandre Dupriez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8815?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108917#comment-17108917
 ] 

Alexandre Dupriez edited comment on KAFKA-8815 at 5/16/20, 8:27 AM:


Hi [~william_reynolds]. In this case, the problem was an I/O stall. Due to 
stuck progress on the I/O block layer, the kernel freezes the file system and 
prevents any progress on `read`/`write`.
After some time, a kernel panic leads to a hard failure of every read/write.

What was your environment (kernel and block device)?

Note: closing this as not a Kafka problem. But please reach out on the mailing 
list if you wish more informations.


was (Author: adupriez):
Hi [~william_reynolds]. In this case, the problem was an I/O stall. Due to 
stuck progress on the I/O block layer, the kernel freezes the file system and 
prevents any progress on `read`/`write`.
After some time, a kernel panic leads to a hard failure of every read/write.

What was your environment (kernel and block device)?

> Kafka broker blocked on I/O primitive
> -
>
> Key: KAFKA-8815
> URL: https://issues.apache.org/jira/browse/KAFKA-8815
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Affects Versions: 1.1.1
>Reporter: Alexandre Dupriez
>Priority: Major
>
> This JIRA is for tracking a problem we run into on a production cluster.
> *Scenario*
> Cluster of 15 brokers and an average ingress throughput of ~4 MB/s and egress 
> of ~4 MB/s per broker.
> Brokers are running on OpenJDK 8. They are configured with a heap size of 1 
> GB.
> There is around ~1,000 partition replicas per broker. Load is evenly 
> balanced. Each broker instance is under fair CPU load, but not overloaded 
> (50-60%). G1 is used for garbage collection and doesn't exhibit any pressure, 
> with mostly short young GC observed and an heap-after-GC usage of 70%.
> Replication factor is 3.
> *Symptom*
> One broker on the cluster suddenly became "unresponsive". Other brokers, 
> Zookeeper and producers/consumers requests were failing with timeouts. The 
> Kafka process, however, was still alive and doing some background work 
> (truncating logs and rolling segments) This lasted for hours. At some point, 
> several thread dumps were taken at few minutes interval. Most of the threads 
> were "blocked". Deadlock was ruled out. The most suspicious stack is the 
> following 
> {code:java}
> Thread 7801: (state = BLOCKED)
>  - sun.nio.ch.FileChannelImpl.write(java.nio.ByteBuffer) @bci=25, line=202 
> (Compiled frame)
>  - 
> org.apache.kafka.common.record.MemoryRecords.writeFullyTo(java.nio.channels.GatheringByteChannel)
>  @bci=24, line=93 (Compiled frame)
>  - 
> org.apache.kafka.common.record.FileRecords.append(org.apache.kafka.common.record.MemoryRecords)
>  @bci=5, line=152 (Compiled frame)
>  - kafka.log.LogSegment.append(long, long, long, long, 
> org.apache.kafka.common.record.MemoryRecords) @bci=82, line=136 (Compiled 
> frame)
>  - kafka.log.Log.$anonfun$append$2(kafka.log.Log, 
> org.apache.kafka.common.record.MemoryRecords, boolean, boolean, int, 
> java.lang.Object) @bci=1080, line=757 (Compiled frame)
>  - kafka.log.Log$$Lambda$614.apply() @bci=24 (Compiled frame)
>  - kafka.log.Log.maybeHandleIOException(scala.Function0, scala.Function0) 
> @bci=1, line=1696 (Compiled frame)
>  - kafka.log.Log.append(org.apache.kafka.common.record.MemoryRecords, 
> boolean, boolean, int) @bci=29, line=642 (Compiled frame)
>  - kafka.log.Log.appendAsLeader(org.apache.kafka.common.record.MemoryRecords, 
> int, boolean) @bci=5, line=612 (Compiled frame)
>  - 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(kafka.cluster.Partition,
>  org.apache.kafka.common.record.MemoryRecords, boolean, int) @bci=148, 
> line=609 (Compiled frame)
>  - kafka.cluster.Partition$$Lambda$837.apply() @bci=16 (Compiled frame)
>  - kafka.utils.CoreUtils$.inLock(java.util.concurrent.locks.Lock, 
> scala.Function0) @bci=7, line=250 (Compiled frame)
>  - 
> kafka.utils.CoreUtils$.inReadLock(java.util.concurrent.locks.ReadWriteLock, 
> scala.Function0) @bci=8, line=256 (Compiled frame)
>  - 
> kafka.cluster.Partition.appendRecordsToLeader(org.apache.kafka.common.record.MemoryRecords,
>  boolean, int) @bci=16, line=597 (Compiled frame)
>  - 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(kafka.server.ReplicaManager,
>  boolean, boolean, short, scala.Tuple2) @bci=295, line=739 (Compiled frame)
>  - kafka.server.ReplicaManager$$Lambda$836.apply(java.lang.Object) @bci=20 
> (Compiled frame)
>  - scala.collection.TraversableLike.$anonfun$map$1(scala.Function1, 
> scala.collection.mutable.Builder, java.lang.Object) @bci=3, line=234 
> (Compiled frame)
>  - scala.collection.TraversableLike$$Lambda$14.apply(java.lang.Object) @bci=9 
> (Compiled frame)
>  - sc

[jira] [Resolved] (KAFKA-8815) Kafka broker blocked on I/O primitive

2020-05-16 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8815?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez resolved KAFKA-8815.
--
Resolution: Not A Problem

System failure. Not related to Kafka.

> Kafka broker blocked on I/O primitive
> -
>
> Key: KAFKA-8815
> URL: https://issues.apache.org/jira/browse/KAFKA-8815
> Project: Kafka
>  Issue Type: Bug
>  Components: core, log
>Affects Versions: 1.1.1
>Reporter: Alexandre Dupriez
>Priority: Major
>
> This JIRA is for tracking a problem we run into on a production cluster.
> *Scenario*
> Cluster of 15 brokers and an average ingress throughput of ~4 MB/s and egress 
> of ~4 MB/s per broker.
> Brokers are running on OpenJDK 8. They are configured with a heap size of 1 
> GB.
> There is around ~1,000 partition replicas per broker. Load is evenly 
> balanced. Each broker instance is under fair CPU load, but not overloaded 
> (50-60%). G1 is used for garbage collection and doesn't exhibit any pressure, 
> with mostly short young GC observed and an heap-after-GC usage of 70%.
> Replication factor is 3.
> *Symptom*
> One broker on the cluster suddenly became "unresponsive". Other brokers, 
> Zookeeper and producers/consumers requests were failing with timeouts. The 
> Kafka process, however, was still alive and doing some background work 
> (truncating logs and rolling segments) This lasted for hours. At some point, 
> several thread dumps were taken at few minutes interval. Most of the threads 
> were "blocked". Deadlock was ruled out. The most suspicious stack is the 
> following 
> {code:java}
> Thread 7801: (state = BLOCKED)
>  - sun.nio.ch.FileChannelImpl.write(java.nio.ByteBuffer) @bci=25, line=202 
> (Compiled frame)
>  - 
> org.apache.kafka.common.record.MemoryRecords.writeFullyTo(java.nio.channels.GatheringByteChannel)
>  @bci=24, line=93 (Compiled frame)
>  - 
> org.apache.kafka.common.record.FileRecords.append(org.apache.kafka.common.record.MemoryRecords)
>  @bci=5, line=152 (Compiled frame)
>  - kafka.log.LogSegment.append(long, long, long, long, 
> org.apache.kafka.common.record.MemoryRecords) @bci=82, line=136 (Compiled 
> frame)
>  - kafka.log.Log.$anonfun$append$2(kafka.log.Log, 
> org.apache.kafka.common.record.MemoryRecords, boolean, boolean, int, 
> java.lang.Object) @bci=1080, line=757 (Compiled frame)
>  - kafka.log.Log$$Lambda$614.apply() @bci=24 (Compiled frame)
>  - kafka.log.Log.maybeHandleIOException(scala.Function0, scala.Function0) 
> @bci=1, line=1696 (Compiled frame)
>  - kafka.log.Log.append(org.apache.kafka.common.record.MemoryRecords, 
> boolean, boolean, int) @bci=29, line=642 (Compiled frame)
>  - kafka.log.Log.appendAsLeader(org.apache.kafka.common.record.MemoryRecords, 
> int, boolean) @bci=5, line=612 (Compiled frame)
>  - 
> kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(kafka.cluster.Partition,
>  org.apache.kafka.common.record.MemoryRecords, boolean, int) @bci=148, 
> line=609 (Compiled frame)
>  - kafka.cluster.Partition$$Lambda$837.apply() @bci=16 (Compiled frame)
>  - kafka.utils.CoreUtils$.inLock(java.util.concurrent.locks.Lock, 
> scala.Function0) @bci=7, line=250 (Compiled frame)
>  - 
> kafka.utils.CoreUtils$.inReadLock(java.util.concurrent.locks.ReadWriteLock, 
> scala.Function0) @bci=8, line=256 (Compiled frame)
>  - 
> kafka.cluster.Partition.appendRecordsToLeader(org.apache.kafka.common.record.MemoryRecords,
>  boolean, int) @bci=16, line=597 (Compiled frame)
>  - 
> kafka.server.ReplicaManager.$anonfun$appendToLocalLog$2(kafka.server.ReplicaManager,
>  boolean, boolean, short, scala.Tuple2) @bci=295, line=739 (Compiled frame)
>  - kafka.server.ReplicaManager$$Lambda$836.apply(java.lang.Object) @bci=20 
> (Compiled frame)
>  - scala.collection.TraversableLike.$anonfun$map$1(scala.Function1, 
> scala.collection.mutable.Builder, java.lang.Object) @bci=3, line=234 
> (Compiled frame)
>  - scala.collection.TraversableLike$$Lambda$14.apply(java.lang.Object) @bci=9 
> (Compiled frame)
>  - scala.collection.mutable.HashMap.$anonfun$foreach$1(scala.Function1, 
> scala.collection.mutable.DefaultEntry) @bci=16, line=138 (Compiled frame)
>  - scala.collection.mutable.HashMap$$Lambda$31.apply(java.lang.Object) @bci=8 
> (Compiled frame)
>  - scala.collection.mutable.HashTable.foreachEntry(scala.Function1) @bci=39, 
> line=236 (Compiled frame)
>  - 
> scala.collection.mutable.HashTable.foreachEntry$(scala.collection.mutable.HashTable,
>  scala.Function1) @bci=2, line=229 (Compiled frame)
>  - scala.collection.mutable.HashMap.foreachEntry(scala.Function1) @bci=2, 
> line=40 (Compiled frame)
>  - scala.collection.mutable.HashMap.foreach(scala.Function1) @bci=7, line=138 
> (Compiled frame)
>  - scala.collection.TraversableLike.map(scala.Function1, 
> scala.collection.generic.CanBuildFrom) @bci=14, line=234 (Compiled fra

[jira] [Comment Edited] (KAFKA-9564) Integration Tests for Tiered Storage

2020-05-16 Thread Alexandre Dupriez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108919#comment-17108919
 ] 

Alexandre Dupriez edited comment on KAFKA-9564 at 5/16/20, 8:29 AM:


2020 May 16th - integration tests for basic scenarios have been written for 
KIP-405. More tests to come to cover additional scenarios.


was (Author: adupriez):
2020 May 16^th^ - integration tests for basic scenarios have been written for 
KIP-405. More tests to come to cover additional scenarios.

> Integration Tests for Tiered Storage
> 
>
> Key: KAFKA-9564
> URL: https://issues.apache.org/jira/browse/KAFKA-9564
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Alexandre Dupriez
>Assignee: Alexandre Dupriez
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9564) Integration Tests for Tiered Storage

2020-05-16 Thread Alexandre Dupriez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9564?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17108919#comment-17108919
 ] 

Alexandre Dupriez commented on KAFKA-9564:
--

2020 May 16^th^ - integration tests for basic scenarios have been written for 
KIP-405. More tests to come to cover additional scenarios.

> Integration Tests for Tiered Storage
> 
>
> Key: KAFKA-9564
> URL: https://issues.apache.org/jira/browse/KAFKA-9564
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Alexandre Dupriez
>Assignee: Alexandre Dupriez
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] Hangleton commented on pull request #8569: KIP-551: Expose disk read and write metrics

2020-05-16 Thread GitBox


Hangleton commented on pull request #8569:
URL: https://github.com/apache/kafka/pull/8569#issuecomment-629612091


   @mumrah 
   
   I think it would be preferable to avoid using JNI libraries, because it can 
add maintenance overhead and require additional configurations for tests. It 
also adds some risks inherent to the safety of the native implementation 
provided by the library.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8673: KAFKA-9992: EmbeddedKafkaCluster.deleteTopicAndWait not working with kafka_2.13

2020-05-16 Thread GitBox


chia7712 commented on a change in pull request #8673:
URL: https://github.com/apache/kafka/pull/8673#discussion_r426134960



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
##
@@ -297,6 +295,15 @@ public void deleteAllTopicsAndWait(final long timeoutMs) 
throws InterruptedExcep
 }
 }
 
+public Set getAllTopics() {
+scala.collection.Iterator topicsIterator = 
brokers[0].kafkaServer().zkClient().getAllTopicsInCluster().iterator();

Review comment:
   > getAllTopicsInCluster()
   
   ```getAllTopicsInCluster(false)```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/utils/EmbeddedKafkaCluster.java
##
@@ -297,6 +295,15 @@ public void deleteAllTopicsAndWait(final long timeoutMs) 
throws InterruptedExcep
 }
 }
 
+public Set getAllTopics() {

Review comment:
   ```getAllTopicsInCluster``` seems to be duplicate to this new method. 
Could we remove the former?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10009) Add method for getting last record offset in kafka partition

2020-05-16 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17109009#comment-17109009
 ] 

Chia-Ping Tsai commented on KAFKA-10009:


If you want to seek to the end offsets in case of transaction, you can set 
isolation.level to READ_COMMITTED and then call Consumer.seekToEnd or 
Consumer.endOffsets. Both of them will get last stable offset.

> Add method for getting last record offset in kafka partition
> 
>
> Key: KAFKA-10009
> URL: https://issues.apache.org/jira/browse/KAFKA-10009
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Yuriy Badalyantc
>Priority: Minor
>
> As far as I understand, at the current moment, there is no reliable way for 
> getting offset of the last record in the partition using java client. There 
> is {{endOffsets}} method in the consumer. And usually {{endOffsets - 1}} 
> works fine. But in the case of transactional producer, topic may contain 
> offsets without a record. And {{endOffsets - 1}} will point to the offset 
> without record.
> This feature will help in situations when consumer application wants to 
> consume the whole topic. Checking of beginning and last record offset will 
> give lower and upper bounds for consuming. Of course, it is doable with the 
> current consumer implementation, but I need to check {{position}} after each 
> poll.
> Also, I believe that this feature may help with monitoring and operations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10009) Add method for getting last record offset in kafka partition

2020-05-16 Thread Yuriy Badalyantc (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17109017#comment-17109017
 ] 

Yuriy Badalyantc commented on KAFKA-10009:
--

Even with read_committed isolation level {{endOffsets - 1}} will not give you 
last record offset if producer is transactional. It will point to an offset 
without a record. I specifically tested this behavior (on 2.4.0). Also, 
isolation level affects only pending transactions. 

> Add method for getting last record offset in kafka partition
> 
>
> Key: KAFKA-10009
> URL: https://issues.apache.org/jira/browse/KAFKA-10009
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Yuriy Badalyantc
>Priority: Minor
>
> As far as I understand, at the current moment, there is no reliable way for 
> getting offset of the last record in the partition using java client. There 
> is {{endOffsets}} method in the consumer. And usually {{endOffsets - 1}} 
> works fine. But in the case of transactional producer, topic may contain 
> offsets without a record. And {{endOffsets - 1}} will point to the offset 
> without record.
> This feature will help in situations when consumer application wants to 
> consume the whole topic. Checking of beginning and last record offset will 
> give lower and upper bounds for consuming. Of course, it is doable with the 
> current consumer implementation, but I need to check {{position}} after each 
> poll.
> Also, I believe that this feature may help with monitoring and operations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8898) if there is no message for poll, kafka consumer also apply memory

2020-05-16 Thread Alexandre Dupriez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8898?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17109029#comment-17109029
 ] 

Alexandre Dupriez commented on KAFKA-8898:
--

Could you please provide some data on the memory consumption of the objects 
highlighted? Do you have any heap dump available?

> if there is no message for poll, kafka consumer also apply memory
> -
>
> Key: KAFKA-8898
> URL: https://issues.apache.org/jira/browse/KAFKA-8898
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Affects Versions: 2.1.1
>Reporter: linking12
>Priority: Critical
>  Labels: performance
> Attachments: image-2019-10-08-12-07-37-328.png
>
>
> when poll message, but there is no record,but consumer will apply 1000 byte 
> memory;
> fetched = *new* HashMap<>() is not good idea, it will apply memory in heap 
> but there is no message;
> I think fetched = *new* HashMap<>() will appear in records exist;
>  
> ```
>   *public* Map>> fetchedRecords() {
>         Map>> fetched = *new* 
> HashMap<>();
>         *int* recordsRemaining = maxPollRecords;
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9794) JMX metrics produce higher memory and CPU consumption in Kafka docker

2020-05-16 Thread Alexandre Dupriez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17109030#comment-17109030
 ] 

Alexandre Dupriez commented on KAFKA-9794:
--

Would you have some data to share? CPU profiles, heap dump, etc. ?

> JMX metrics produce higher memory and CPU consumption in Kafka docker
> -
>
> Key: KAFKA-9794
> URL: https://issues.apache.org/jira/browse/KAFKA-9794
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 2.2.0
> Environment: kafka version: kafka_2.12-2.2.0.tgz using docker
>Reporter: Hariprasad
>Priority: Major
>  Labels: performance
>
> We have implemented the Kafka metrics using this github: 
> [https://github.com/oded-dd/prometheus-jmx-kafka]
> we have enabled the metrics using KAFKA_OPTS. export 
> KAFKA_OPTS='-javaagent:/etc/kafka/jmx_prometheus_javaagent-0.3.1.jar=9097:/etc/kafka/kafka-jmx-metrics.yaml'
> After starting the Kafka with this, we have seen high usage in CPU and memory 
> shoot up in the docker
> CPU cycles goes some times as up as 300% , we are running with 4 cpu's  
> Kindly suggest some solution how to fix this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9794) JMX metrics produce higher memory and CPU consumption in Kafka docker

2020-05-16 Thread Alexandre Dupriez (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9794?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17109030#comment-17109030
 ] 

Alexandre Dupriez edited comment on KAFKA-9794 at 5/16/20, 12:13 PM:
-

Would you have some data to share? CPU profile, heap dump, etc. ?


was (Author: adupriez):
Would you have some data to share? CPU profiles, heap dump, etc. ?

> JMX metrics produce higher memory and CPU consumption in Kafka docker
> -
>
> Key: KAFKA-9794
> URL: https://issues.apache.org/jira/browse/KAFKA-9794
> Project: Kafka
>  Issue Type: Bug
>  Components: metrics
>Affects Versions: 2.2.0
> Environment: kafka version: kafka_2.12-2.2.0.tgz using docker
>Reporter: Hariprasad
>Priority: Major
>  Labels: performance
>
> We have implemented the Kafka metrics using this github: 
> [https://github.com/oded-dd/prometheus-jmx-kafka]
> we have enabled the metrics using KAFKA_OPTS. export 
> KAFKA_OPTS='-javaagent:/etc/kafka/jmx_prometheus_javaagent-0.3.1.jar=9097:/etc/kafka/kafka-jmx-metrics.yaml'
> After starting the Kafka with this, we have seen high usage in CPU and memory 
> shoot up in the docker
> CPU cycles goes some times as up as 300% , we are running with 4 cpu's  
> Kindly suggest some solution how to fix this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8916) Unreliable kafka-reassign-partitions.sh affecting performance

2020-05-16 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8916?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez resolved KAFKA-8916.
--
Resolution: Invalid

Closing this as there is no bug or development required.

Please kindly reach out to the user mailing list for this type of question: 
us...@kafka.apache.org. More information to engage the Kafka community is 
available on https://kafka.apache.org/contact.html.


> Unreliable kafka-reassign-partitions.sh affecting performance
> -
>
> Key: KAFKA-8916
> URL: https://issues.apache.org/jira/browse/KAFKA-8916
> Project: Kafka
>  Issue Type: Task
>  Components: admin, config
>Affects Versions: 2.1.1
> Environment: CentOS 7
>Reporter: VinayKumar
>Priority: Major
>  Labels: performance
>
> Currently I have 3 node kafka cluster, and I want to add 2 more nodes to make 
> it 5 node cluster.
>  *After adding the nodes to cluster, I need all the topic partitions to be 
> evenly distributed across all the 5 nodes.
>  **In the past, when I ran kafka-reassign-partitions.sh & 
> kafka-preferred-replica-election.sh, it ran for very long time, hung & made 
> the cluster unstable. So I'm afraid to use this method.
> Can you please suggest the best & foolproof way to assign partitions among 
> all the cluster nodes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-6959) Any impact we foresee if we upgrade Linux version or move to VM instead of physical Linux server

2020-05-16 Thread Alexandre Dupriez (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Alexandre Dupriez resolved KAFKA-6959.
--
Resolution: Fixed

> Any impact we foresee if we upgrade Linux version or move to VM instead of 
> physical Linux server
> 
>
> Key: KAFKA-6959
> URL: https://issues.apache.org/jira/browse/KAFKA-6959
> Project: Kafka
>  Issue Type: Task
>  Components: admin
>Affects Versions: 0.11.0.2
> Environment: Prod
>Reporter: Gene Yi
>Priority: Trivial
>  Labels: patch, performance, security
>
> As we know that the recent issue on the Liunx Meltdown and Spectre. all the 
> Linux servers need to deploy the patch and the OS version at least to be 6.9. 
> we want to know the impact to Kafka, is there any side effect if we directly 
> upgrade the OS to 7.0,  also is there any limitation if we deploy Kafka to VM 
> instead of the physical servers?
> currently the Kafka version we used is 0.11.0.2



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10009) Add method for getting last record offset in kafka partition

2020-05-16 Thread Chia-Ping Tsai (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17109045#comment-17109045
 ] 

Chia-Ping Tsai commented on KAFKA-10009:


So you want to get end offset which is associated to a true record and the LSO, 
which maybe a smallest offset of open transaction, is not what you expect.

 

> Add method for getting last record offset in kafka partition
> 
>
> Key: KAFKA-10009
> URL: https://issues.apache.org/jira/browse/KAFKA-10009
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Yuriy Badalyantc
>Priority: Minor
>
> As far as I understand, at the current moment, there is no reliable way for 
> getting offset of the last record in the partition using java client. There 
> is {{endOffsets}} method in the consumer. And usually {{endOffsets - 1}} 
> works fine. But in the case of transactional producer, topic may contain 
> offsets without a record. And {{endOffsets - 1}} will point to the offset 
> without record.
> This feature will help in situations when consumer application wants to 
> consume the whole topic. Checking of beginning and last record offset will 
> give lower and upper bounds for consuming. Of course, it is doable with the 
> current consumer implementation, but I need to check {{position}} after each 
> poll.
> Also, I believe that this feature may help with monitoring and operations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10009) Add method for getting last record offset in kafka partition

2020-05-16 Thread Yuriy Badalyantc (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10009?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17109054#comment-17109054
 ] 

Yuriy Badalyantc commented on KAFKA-10009:
--

Exactly. I want an offset of a last true record in a partition.

> Add method for getting last record offset in kafka partition
> 
>
> Key: KAFKA-10009
> URL: https://issues.apache.org/jira/browse/KAFKA-10009
> Project: Kafka
>  Issue Type: New Feature
>  Components: clients, consumer
>Reporter: Yuriy Badalyantc
>Priority: Minor
>
> As far as I understand, at the current moment, there is no reliable way for 
> getting offset of the last record in the partition using java client. There 
> is {{endOffsets}} method in the consumer. And usually {{endOffsets - 1}} 
> works fine. But in the case of transactional producer, topic may contain 
> offsets without a record. And {{endOffsets - 1}} will point to the offset 
> without record.
> This feature will help in situations when consumer application wants to 
> consume the whole topic. Checking of beginning and last record offset will 
> give lower and upper bounds for consuming. Of course, it is doable with the 
> current consumer implementation, but I need to check {{position}} after each 
> poll.
> Also, I believe that this feature may help with monitoring and operations.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-1602) Use single error code for offset commit response

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1602?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-1602.
--
Resolution: Not A Problem

> Use single error code for offset commit response
> 
>
> Key: KAFKA-1602
> URL: https://issues.apache.org/jira/browse/KAFKA-1602
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie++
>
> As discussed with Joel offline, today the offset commit response contains an 
> error code for each partition, but the error code will almost always be the 
> same across all partitions. Offset commits for a given consumer instance in a 
> consumer group all go to the same partition of the offsets topic. So there 
> are only two known cases when we may want it to be different: (i) some 
> partitions may have very large metadata leading to a metadata-size-too-large 
> error (ii) if we don't use compression (which will be the case until we have 
> KAFKA-1374) there could be partial appends. Given that: item (ii) is a corner 
> case and if we use compression (which we will turn on by default once 
> KAFKA-1374 is done) item (i) will be eliminated; and that item (i) will be 
> easier to handle by just failing the whole request and sending back a single 
> error code (of metadata too large); we can just have a single error-code in 
> the response.
> Note that this will be a protocol change and hence we need to bump up the 
> response version number.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-3220) Failure in kafka.server.ClientQuotaManagerTest.testQuotaViolation

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3220?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3220.
--
Resolution: Fixed

Did not see this recently.

> Failure in kafka.server.ClientQuotaManagerTest.testQuotaViolation
> -
>
> Key: KAFKA-3220
> URL: https://issues.apache.org/jira/browse/KAFKA-3220
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: transient-unit-test-failure
>
> {code}
> java.lang.AssertionError: expected:<0> but was:<1>
>   at org.junit.Assert.fail(Assert.java:88)
>   at org.junit.Assert.failNotEquals(Assert.java:834)
>   at org.junit.Assert.assertEquals(Assert.java:645)
>   at org.junit.Assert.assertEquals(Assert.java:631)
>   at 
> kafka.server.ClientQuotaManagerTest.testQuotaViolation(ClientQuotaManagerTest.scala:110)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
>   at sun.reflect.GeneratedMethodAccessor44.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
>   at sun.reflect.GeneratedMethodAccessor43.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
>   at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
>   at 
> org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:744)
> {code}
> Example: 
> https://builds.apache.org/job/kafka-trunk-git-pr-jdk

[jira] [Resolved] (KAFKA-4143) Transient failure in kafka.server.SaslSslReplicaFetchTest.testReplicaFetcherThread

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4143?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-4143.
--
Resolution: Fixed

Did not see this recently.

> Transient failure in 
> kafka.server.SaslSslReplicaFetchTest.testReplicaFetcherThread
> --
>
> Key: KAFKA-4143
> URL: https://issues.apache.org/jira/browse/KAFKA-4143
> Project: Kafka
>  Issue Type: Sub-task
>  Components: unit tests
>Reporter: Guozhang Wang
>Priority: Major
>
> Example: 
> https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/5471/testReport/junit/kafka.server/SaslSslReplicaFetchTest/testReplicaFetcherThread/
> {code}
> java.lang.AssertionError: Partition [foo,0] metadata not propagated after 
> 15000 ms
>   at org.junit.Assert.fail(Assert.java:88)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:752)
>   at 
> kafka.utils.TestUtils$.waitUntilMetadataIsPropagated(TestUtils.scala:794)
>   at 
> kafka.utils.TestUtils$$anonfun$createTopic$1.apply(TestUtils.scala:228)
>   at 
> kafka.utils.TestUtils$$anonfun$createTopic$1.apply(TestUtils.scala:227)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at 
> scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
>   at scala.collection.immutable.Range.foreach(Range.scala:141)
>   at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
>   at scala.collection.AbstractTraversable.map(Traversable.scala:105)
>   at kafka.utils.TestUtils$.createTopic(TestUtils.scala:227)
>   at 
> kafka.server.BaseReplicaFetchTest$$anonfun$testReplicaFetcherThread$2.apply(BaseReplicaFetchTest.scala:62)
>   at 
> kafka.server.BaseReplicaFetchTest$$anonfun$testReplicaFetcherThread$2.apply(BaseReplicaFetchTest.scala:61)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at 
> kafka.server.BaseReplicaFetchTest.testReplicaFetcherThread(BaseReplicaFetchTest.scala:61)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dis

[jira] [Resolved] (KAFKA-3738) Add load system tests for Streams

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3738?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3738.
--
Resolution: Fixed

We have soak tests and perf benchmarks today.

> Add load system tests for Streams
> -
>
> Key: KAFKA-3738
> URL: https://issues.apache.org/jira/browse/KAFKA-3738
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: test
>
> Today in system tests we have mainly two types of tests: 1) correctness tests 
> for delivery guarantees with some "chaos monkey" mechanism; 2) performance 
> tests for evaluating single-node efficiency.
> We want to consider adding another type of system tests called "load tests" 
> for Streams, in which we can have a large scale settings of a Streams app and 
> let it run under heavy load for some time, and measure CPU / memory / disk / 
> etc to check if it behaves properly under load.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-3261) Consolidate class kafka.cluster.BrokerEndPoint and kafka.cluster.EndPoint

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-3261:


Assignee: (was: chen zhu)

> Consolidate class kafka.cluster.BrokerEndPoint and kafka.cluster.EndPoint
> -
>
> Key: KAFKA-3261
> URL: https://issues.apache.org/jira/browse/KAFKA-3261
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie
>
> These two classes are serving similar purposes and can be consolidated. Also 
> as [~sasakitoa] suggested we can remove their "uriParseExp" variables but use 
> (a possibly modified)
> {code}
> private static final Pattern HOST_PORT_PATTERN = 
> Pattern.compile(".*?\\[?([0-9a-zA-Z\\-.:]*)\\]?:([0-9]+)");
> {code}
> in org.apache.kafka.common.utils.Utils instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-3261) Consolidate class kafka.cluster.BrokerEndPoint and kafka.cluster.EndPoint

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3261?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-3261:
-
Labels: newbie  (was: )

> Consolidate class kafka.cluster.BrokerEndPoint and kafka.cluster.EndPoint
> -
>
> Key: KAFKA-3261
> URL: https://issues.apache.org/jira/browse/KAFKA-3261
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: chen zhu
>Priority: Major
>  Labels: newbie
>
> These two classes are serving similar purposes and can be consolidated. Also 
> as [~sasakitoa] suggested we can remove their "uriParseExp" variables but use 
> (a possibly modified)
> {code}
> private static final Pattern HOST_PORT_PATTERN = 
> Pattern.compile(".*?\\[?([0-9a-zA-Z\\-.:]*)\\]?:([0-9]+)");
> {code}
> in org.apache.kafka.common.utils.Utils instead.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-3168) Failure in kafka.integration.PrimitiveApiTest.testPipelinedProduceRequests

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3168.
--
Resolution: Fixed

> Failure in kafka.integration.PrimitiveApiTest.testPipelinedProduceRequests
> --
>
> Key: KAFKA-3168
> URL: https://issues.apache.org/jira/browse/KAFKA-3168
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: transient-unit-test-failure
>
> {code}
> java.lang.AssertionError: Published messages should be in the log
>   at org.junit.Assert.fail(Assert.java:88)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:747)
>   at 
> kafka.integration.PrimitiveApiTest.testPipelinedProduceRequests(PrimitiveApiTest.scala:245)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
>   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
>   at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
>   at 
> org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:744)
> {code}
> Example: 
> https://builds.apache.org/job/kafka-trunk-git-pr-jdk7/2153/t

[jira] [Assigned] (KAFKA-3168) Failure in kafka.integration.PrimitiveApiTest.testPipelinedProduceRequests

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3168?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-3168:


Assignee: (was: Parag Shah)

> Failure in kafka.integration.PrimitiveApiTest.testPipelinedProduceRequests
> --
>
> Key: KAFKA-3168
> URL: https://issues.apache.org/jira/browse/KAFKA-3168
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: transient-unit-test-failure
>
> {code}
> java.lang.AssertionError: Published messages should be in the log
>   at org.junit.Assert.fail(Assert.java:88)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:747)
>   at 
> kafka.integration.PrimitiveApiTest.testPipelinedProduceRequests(PrimitiveApiTest.scala:245)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
>   at sun.reflect.GeneratedMethodAccessor12.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
>   at sun.reflect.GeneratedMethodAccessor11.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.messaging.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:360)
>   at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:54)
>   at 
> org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:40)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
>   at java.lang.Thread.run(Thread.java:744)
> {code}
> Example: 
> https://builds.apache.org/job/kafka-trunk

[jira] [Resolved] (KAFKA-1617) Move Metadata Cache to TopicManager and handling of Offset Request to LogManager

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-1617.
--
Resolution: Won't Fix

> Move Metadata Cache to TopicManager and handling of Offset Request to 
> LogManager
> 
>
> Key: KAFKA-1617
> URL: https://issues.apache.org/jira/browse/KAFKA-1617
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Priority: Major
>
> This is a follow-up of KAFKA-1583. In order to make Kafka APIs a pure 
> stateless layer that just forwards different requests to the corresponding 
> managers, there are still two tasks left:
> 1. Move the metadata cache at KafkaApis to a separate manager, maybe called 
> TopicManager, which will be responsible for a) handle topic metadata request, 
> b) handle topic metadata update request by talking to replica manager if 
> necessary.
> 2. Move the handling logic of offset request to the LogManager, which should 
> contain all the information necessary to handle this request.
> Finally, the KafkaApis class should be stateless, meaning no inner variables 
> and no start()/shutdown() functions needed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-3103) Transient Failure in kafka.integration.PlaintextTopicMetadataTest.testIsrAfterBrokerShutDownAndJoinsBack

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-3103?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-3103.
--
Resolution: Fixed

> Transient Failure in 
> kafka.integration.PlaintextTopicMetadataTest.testIsrAfterBrokerShutDownAndJoinsBack
> 
>
> Key: KAFKA-3103
> URL: https://issues.apache.org/jira/browse/KAFKA-3103
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Guozhang Wang
>Assignee: Denise Fernandez
>Priority: Major
>
> Saw this case once:
> {code}
> java.lang.AssertionError: Topic metadata is not correctly updated for broker 
> kafka.server.KafkaServer@15414e6f.
> Expected ISR: List(BrokerEndPoint(0,localhost,41407), 
> BrokerEndPoint(1,localhost,56370))
> Actual ISR  : 
>   at org.junit.Assert.fail(Assert.java:88)
>   at kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:747)
>   at 
> kafka.integration.BaseTopicMetadataTest$$anonfun$checkIsr$1.apply(BaseTopicMetadataTest.scala:193)
>   at 
> kafka.integration.BaseTopicMetadataTest$$anonfun$checkIsr$1.apply(BaseTopicMetadataTest.scala:191)
>   at scala.collection.immutable.List.foreach(List.scala:318)
>   at 
> kafka.integration.BaseTopicMetadataTest.checkIsr(BaseTopicMetadataTest.scala:191)
>   at 
> kafka.integration.BaseTopicMetadataTest.testIsrAfterBrokerShutDownAndJoinsBack(BaseTopicMetadataTest.scala:236)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:105)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:56)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:64)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:50)
>   at sun.reflect.GeneratedMethodAccessor7.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.messaging.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.messaging.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:106)
>   at sun.reflect.GeneratedMethodAccessor6.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:606)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.messaging.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)

[jira] [Resolved] (KAFKA-1985) Document on possible error codes for each response type

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-1985.
--
Resolution: Fixed

> Document on possible error codes for each response type
> ---
>
> Key: KAFKA-1985
> URL: https://issues.apache.org/jira/browse/KAFKA-1985
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Reporter: Guozhang Wang
>Priority: Major
>
> When coding the clients logic we tend to forget one or more possible error 
> codes that needs special handling because it is not summarized and documented 
> anywhere. It would better to at least add comments in
> {code}
> org.apache.kafka.common.requests.XXResponse
> {code}
> about all possible error codes so that people can check and handle them 
> appropriately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-2045) Memory Management on the consumer

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2045?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-2045:
-
Labels: newbie++  (was: )

> Memory Management on the consumer
> -
>
> Key: KAFKA-2045
> URL: https://issues.apache.org/jira/browse/KAFKA-2045
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie++
>
> We need to add the memory management on the new consumer like we did in the 
> new producer. This would probably include:
> 1. byte buffer re-usage for fetch response partition data.
> 2. byte buffer re-usage for on-the-fly de-compression.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-5719) Create a quickstart archetype project for Kafka Streams

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5719?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-5719.
--
Resolution: Fixed

> Create a quickstart archetype project for Kafka Streams
> ---
>
> Key: KAFKA-5719
> URL: https://issues.apache.org/jira/browse/KAFKA-5719
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> People have been suggesting to add some tutorial-like web doc sections on how 
> to implement a stream processing application using the Kafka Streams library. 
> Plus the current o.a.k.streams.examples package is that it is part of the 
> o.a.k. package which conflicts with common development cycle experience. So 
> I'd like to propose adding a maven archetype project for Kafka Streams along 
> with an additional tutorial / quickstart web doc sections. Here is a 
> step-by-step plan for the project:
> 1. Add the archetype project and include in the release (both in dist.apache 
> and repository.apache maven repos); also add a {{Write you own Streams 
> application}} section on web docs along with the existing {{Play with a demo 
> Streams application}}.
> 2. Modify {{Play with a demo Streams application}} to be based on the 
> archetype project as well.
> 3. Migrate all examples code from {{o.a.k.streams.example}} to the archetype 
> project and remove this package.
> * 4. Moving forward, we can add more complicated examples in the archetype 
> project such as integration with Connect, interactive queries, etc; and 
> augment the demo and tutorial web doc sections accordingly.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-7042) Fall back to the old behavior when the broker is too old to recognize LIST_OFFSET versions

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7042?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-7042.
--
Resolution: Not A Problem

> Fall back to the old behavior when the broker is too old to recognize 
> LIST_OFFSET versions
> --
>
> Key: KAFKA-7042
> URL: https://issues.apache.org/jira/browse/KAFKA-7042
> Project: Kafka
>  Issue Type: Improvement
>  Components: consumer
>Reporter: Guozhang Wang
>Priority: Major
>
> When READ_COMMITTED is turned on (since 0.11.0), LIST_OFFSET requires 
> min_version to be 2 on the consumer client side. On the other hand, if broker 
> is no newer than 0.10.2 it can only recognize the version of LIST_OFFSET up 
> to 1. In this case the consumer talking to such an old broker will throw an 
> exception right away.
> What we can improve though, is that when the consumer realized broker does 
> not recognize LIST_OFFSET of at least 2, it can fall back to the old behavior 
> of READ_UNCOMMITTED since the data on that broker should not have any txn 
> markers anyways. By doing this we would lift the hard restriction that 
> consumers with READ_COMMITTED cannot work with an older version of broker 
> (remember we are trying to achieve broker compatibility since 0.10.0).



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-5209) Transient failure: kafka.server.MetadataRequestTest.testControllerId

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5209?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang reassigned KAFKA-5209:


Assignee: (was: Umesh Chaudhary)

> Transient failure: kafka.server.MetadataRequestTest.testControllerId
> 
>
> Key: KAFKA-5209
> URL: https://issues.apache.org/jira/browse/KAFKA-5209
> Project: Kafka
>  Issue Type: Sub-task
>  Components: unit tests
>Reporter: Guozhang Wang
>Priority: Major
>
> {code}
> Stacktrace
> java.lang.NullPointerException
>   at 
> kafka.server.MetadataRequestTest.testControllerId(MetadataRequestTest.scala:57)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at 
> org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57)
>   at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288)
>   at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:363)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at sun.reflect.GeneratedMethodAccessor50.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32)
>   at 
> org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93)
>   at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
>   at 
> org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:109)
>   at sun.reflect.GeneratedMethodAccessor49.invoke(Unknown Source)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:147)
>   at 
> org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:129)
>   at 
> org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404)
>   at 
> org.gradle.internal.concurrent.ExecutorPolicy$CatchAndRecordFailures.onExecute(ExecutorPolicy.java:63)
>   at 
> org.gradle.internal.concurrent.StoppableExecutorImpl$1.run(StoppableExecutorImpl.java:46)
>   at 
> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
>   at 
> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecu

[jira] [Resolved] (KAFKA-8197) Flaky Test kafka.server.DynamicBrokerConfigTest > testPasswordConfigEncoderSecretChange

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8197?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8197.
--
Resolution: Fixed

> Flaky Test kafka.server.DynamicBrokerConfigTest > 
> testPasswordConfigEncoderSecretChange
> ---
>
> Key: KAFKA-8197
> URL: https://issues.apache.org/jira/browse/KAFKA-8197
> Project: Kafka
>  Issue Type: Improvement
>  Components: core, unit tests
>Affects Versions: 1.1.1
>Reporter: Guozhang Wang
>Priority: Major
>
> {code}
> 09:18:23 kafka.server.DynamicBrokerConfigTest > 
> testPasswordConfigEncoderSecretChange FAILED
> 09:18:23 org.junit.ComparisonFailure: expected:<[staticLoginModule 
> required;]> but was:<[????O?i???A?c'??Ch?|?p]>
> 09:18:23 at org.junit.Assert.assertEquals(Assert.java:115)
> 09:18:23 at org.junit.Assert.assertEquals(Assert.java:144)
> 09:18:23 at 
> kafka.server.DynamicBrokerConfigTest.testPasswordConfigEncoderSecretChange(DynamicBrokerConfigTest.scala:253)
> {code}
> https://builds.apache.org/job/kafka-pr-jdk7-scala2.11/13466/consoleFull



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6579) Consolidate window store and session store unit tests into a single class

2020-05-16 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6579?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17109100#comment-17109100
 ] 

Guozhang Wang commented on KAFKA-6579:
--

[~ableegoldman] Is it resolved now?

> Consolidate window store and session store unit tests into a single class
> -
>
> Key: KAFKA-6579
> URL: https://issues.apache.org/jira/browse/KAFKA-6579
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Sophie Blee-Goldman
>Priority: Major
>  Labels: newbie, unit-test
>
> For key value store, we have a {{AbstractKeyValueStoreTest}} that is shared 
> among all its implementations; however for window and session stores, each 
> class has its own independent unit test classes that do not share the test 
> coverage. In fact, many of these test classes share the same unit test 
> functions (e.g. {{RocksDBWindowStoreTest}}, 
> {{CompositeReadOnlyWindowStoreTest}} and {{CachingWindowStoreTest}}).
> It is better to use the same pattern as for key value stores to consolidate 
> these test functions into a shared base class.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-8918) Flaky Test org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskCancellation

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8918?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-8918.
--
Resolution: Fixed

> Flaky Test 
> org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskCancellation
> 
>
> Key: KAFKA-8918
> URL: https://issues.apache.org/jira/browse/KAFKA-8918
> Project: Kafka
>  Issue Type: Bug
>  Components: core, unit tests
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: flaky-test
>
> {code}
> Stacktrace
> java.lang.AssertionError: Condition not met within timeout 15000. Timed out 
> waiting for expected tasks 
> {"foo":{"id":"foo","taskState":{"state":"DONE","spec":{"class":"org.apache.kafka.trogdor.task.NoOpTaskSpec","startMs":5,"durationMs":7},"startedMs":11,"doneMs":18,"cancelled":true,"status":{"node01":"done","node02":"done"}},"workerState":{"state":"DONE","taskId":"foo","spec":{"class":"org.apache.kafka.trogdor.task.NoOpTaskSpec","startMs":5,"durationMs":7},"startedMs":11,"doneMs":18,"status":"done"}}}
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:376)
>   at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:336)
>   at 
> org.apache.kafka.trogdor.common.ExpectedTasks.waitFor(ExpectedTasks.java:144)
>   at 
> org.apache.kafka.trogdor.coordinator.CoordinatorTest.testTaskCancellation(CoordinatorTest.java:264)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:288)
>   at 
> org.junit.internal.runners.statements.FailOnTimeout$CallableStatement.call(FailOnTimeout.java:282)
>   at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
>   at java.base/java.lang.Thread.run(Thread.java:834)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10001) Store's own restore listener should be triggered in store changelog reader

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-10001:
--
Labels: streams  (was: )

> Store's own restore listener should be triggered in store changelog reader
> --
>
> Key: KAFKA-10001
> URL: https://issues.apache.org/jira/browse/KAFKA-10001
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: streams
> Fix For: 2.6.0
>
>
> Streams' state store ``register()`` function passed in `restoreCallback` can 
> potentially also be a `RestoreListener`, in which case its corresponding 
> `onRestoreStart / End / batchRestored` should be triggered.
> This is a regression in trunk -- 2.5 has this logic right but got regressed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10001) Store's own restore listener should be triggered in store changelog reader

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10001?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-10001.
---
Fix Version/s: 2.6.0
   Resolution: Fixed

> Store's own restore listener should be triggered in store changelog reader
> --
>
> Key: KAFKA-10001
> URL: https://issues.apache.org/jira/browse/KAFKA-10001
> Project: Kafka
>  Issue Type: Bug
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
>
> Streams' state store ``register()`` function passed in `restoreCallback` can 
> potentially also be a `RestoreListener`, in which case its corresponding 
> `onRestoreStart / End / batchRestored` should be triggered.
> This is a regression in trunk -- 2.5 has this logic right but got regressed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bbejeck commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-16 Thread GitBox


bbejeck commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r426163653



##
File path: docs/streams/upgrade-guide.html
##
@@ -35,7 +35,7 @@ Upgrade Guide and API Changes
 
 
 Upgrading from any older version to {{fullDotVersion}} is possible: 
you will need to do two rolling bounces, where during the first rolling bounce 
phase you set the config upgrade.from="older version"
-(possible values are "0.10.0" - "2.3") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
+(possible values are "0.10.0" - "2.4") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager

Review comment:
   not sure about this as it describes a rolling upgrade from an older 
version, so we'll need @ableegoldman to confirm





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on a change in pull request #8622: MINOR: Update stream documentation

2020-05-16 Thread GitBox


bbejeck commented on a change in pull request #8622:
URL: https://github.com/apache/kafka/pull/8622#discussion_r426163653



##
File path: docs/streams/upgrade-guide.html
##
@@ -35,7 +35,7 @@ Upgrade Guide and API Changes
 
 
 Upgrading from any older version to {{fullDotVersion}} is possible: 
you will need to do two rolling bounces, where during the first rolling bounce 
phase you set the config upgrade.from="older version"
-(possible values are "0.10.0" - "2.3") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager
+(possible values are "0.10.0" - "2.4") and during the 
second you remove it. This is required to safely upgrade to the new cooperative 
rebalancing protocol of the embedded consumer. Note that you will remain using 
the old eager

Review comment:
   I think this change is ok as it describes the rolling upgrades from an 
older version. But we should @ableegoldman to confirm.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang removed a comment on pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-05-16 Thread GitBox


guozhangwang removed a comment on pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#issuecomment-629664209


   test this



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-05-16 Thread GitBox


guozhangwang commented on pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#issuecomment-629664171







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8671: KAFKA-9859 / Add topics generated by KTable FK join to internal topic matching logic

2020-05-16 Thread GitBox


guozhangwang commented on pull request #8671:
URL: https://github.com/apache/kafka/pull/8671#issuecomment-629672739


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #8671: KAFKA-9859 / Add topics generated by KTable FK join to internal topic matching logic

2020-05-16 Thread GitBox


guozhangwang commented on a change in pull request #8671:
URL: https://github.com/apache/kafka/pull/8671#discussion_r426170554



##
File path: core/src/main/scala/kafka/tools/StreamsResetter.java
##
@@ -673,11 +673,17 @@ private boolean isInternalTopic(final String topicName) {
 // Even is this is not expected in general, we need to exclude those 
topics here
 // and don't consider them as internal topics even if they follow the 
same naming schema.
 // Cf. https://issues.apache.org/jira/browse/KAFKA-7930
-return !isInputTopic(topicName) && !isIntermediateTopic(topicName)
-&& topicName.startsWith(options.valueOf(applicationIdOption) + "-")
-&& (topicName.endsWith("-changelog") || 
topicName.endsWith("-repartition")
-|| topicName.endsWith("-subscription-registration-topic")
-|| topicName.endsWith("-subscription-response-topic"));
+return !isInputTopic(topicName) && !isIntermediateTopic(topicName) && 
topicName.startsWith(options.valueOf(applicationIdOption) + "-")
+   && matchesInternalTopicFormat(topicName);
+}
+
+// visible for testing
+public boolean matchesInternalTopicFormat(final String topicName) {
+return topicName.endsWith("-changelog") || 
topicName.endsWith("-repartition")
+   || topicName.endsWith("-subscription-registration-topic")

Review comment:
   Not a suggestion comment: now looking at this I think maybe we should 
not add the `-topic` suffix for those topics since we did not for repartition 
and changelog :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #8659: KAFKA-9617 Replica Fetcher can mark partition as failed when max.mess…

2020-05-16 Thread GitBox


guozhangwang commented on pull request #8659:
URL: https://github.com/apache/kafka/pull/8659#issuecomment-629677117


   Thanks @chia7712 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #8659: KAFKA-9617 Replica Fetcher can mark partition as failed when max.mess…

2020-05-16 Thread GitBox


guozhangwang merged pull request #8659:
URL: https://github.com/apache/kafka/pull/8659


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-9617) Replica Fetcher can mark partition as failed when max.message.bytes is changed

2020-05-16 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9617?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang resolved KAFKA-9617.
--
Fix Version/s: 2.6.0
   Resolution: Fixed

> Replica Fetcher can mark partition as failed when max.message.bytes is changed
> --
>
> Key: KAFKA-9617
> URL: https://issues.apache.org/jira/browse/KAFKA-9617
> Project: Kafka
>  Issue Type: Bug
>Reporter: Stanislav Kozlovski
>Assignee: Chia-Ping Tsai
>Priority: Major
>  Labels: newbie
> Fix For: 2.6.0
>
>
> There exists a race condition when changing the dynamic max.message.bytes 
> config for a topic. A follower replica can replicate a message that is over 
> that size after it processes the config change. When this happens, the 
> replica fetcher catches the unexpected exception, marks the partition as 
> failed and stops replicating it.
> {code:java}
> 06:38:46.596  Processing override for entityPath: topics/partition-1 with 
> config: Map(max.message.bytes -> 512)
> 06:38:46.597   [ReplicaFetcher replicaId=1, leaderId=3, fetcherId=0] 
> Unexpected error occurred while processing data for partition partition-1 at 
> offset 20964
> org.apache.kafka.common.errors.RecordTooLargeException: The record batch size 
> in the append to partition-1 is 3349 bytes which exceeds the maximum 
> configured value of 512.
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9955) Exceptions thrown from SinkTask::close shadow other exceptions

2020-05-16 Thread Konstantine Karantasis (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9955?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Konstantine Karantasis resolved KAFKA-9955.
---
Resolution: Fixed

> Exceptions thrown from SinkTask::close shadow other exceptions
> --
>
> Key: KAFKA-9955
> URL: https://issues.apache.org/jira/browse/KAFKA-9955
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 0.10.0.0
>Reporter: Greg Harris
>Assignee: Greg Harris
>Priority: Minor
> Fix For: 2.3.2, 2.6.0, 2.4.2, 2.5.1
>
>
> If an exception is thrown from SinkTask::close, the exception will shadow any 
> other previous exception, because SinkTask::close is called from within a 
> finally block.
> Steps to reproduce:
>  # Throw an exception from SinkTask::start or SinkTask::put 
>  # Throw an exception from SinkTask::close
> Expected behavior:
>  * All exceptions are visible in separate log messages
>  * The error from SinkTask::start or SinkTask::put is logged as the exception 
> that caused the task to stop.
> Actual behavior:
>  * The exception from SinkTask::close is logged as the exception that caused 
> the task to stop.
>  * The exceptions from either SinkTask::start or SinkTask::put are swallowed 
> and don't appear in the logs at all.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on pull request #8221: KAFKA-9561: update task input partitions after rebalance

2020-05-16 Thread GitBox


abbccdda commented on pull request #8221:
URL: https://github.com/apache/kafka/pull/8221#issuecomment-629694343


   Double checked the failures are due to known flaky test 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bdbyrne commented on pull request #8658: KAFKA-9980: Fix client quotas default entity name handling in broker.

2020-05-16 Thread GitBox


bdbyrne commented on pull request #8658:
URL: https://github.com/apache/kafka/pull/8658#issuecomment-629713796


   I've applied @d8tltanc's updates to `quota_test.py` and verified their 
success:
   
   ```
   docker exec ducker01 bash -c "cd /opt/kafka-dev && ducktape --cluster-file 
/opt/kafka-dev/tests/docker/build/cluster.json  
./tests/kafkatest/tests/client/quota_test.py"
   [INFO:2020-05-16 14:54:39,531]: starting test run with session id 
2020-05-16--002...
   [INFO:2020-05-16 14:54:39,531]: running 9 tests...
   [INFO:2020-05-16 14:54:39,531]: Triggering test 1 of 9...
   [INFO:2020-05-16 14:54:39,536]: RunnerClient: Loading test {'directory': 
'/opt/kafka-dev/tests/kafkatest/tests/client', 'file_name': 'quota_test.py', 
'method_name': 'test_quota', 'cls_name': 'QuotaTest', 'injected_args': 
{'consumer_num': 2, 'quota_type': 'client-id'}}
   [INFO:2020-05-16 14:54:39,539]: RunnerClient: 
kafkatest.tests.client.quota_test.QuotaTest.test_quota.consumer_num=2.quota_type=client-id:
 Setting up...
   /usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:39: 
CryptographyDeprecationWarning: encode_point has been deprecated on 
EllipticCurvePublicNumbers and will be removed in a future version. Please use 
EllipticCurvePublicKey.public_bytes to obtain both compressed and uncompressed 
point encoding.
 m.add_string(self.Q_C.public_numbers().encode_point())
   /usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:94: 
CryptographyDeprecationWarning: Support for unsafe construction of public 
numbers from encoded data will be removed in a future version. Please use 
EllipticCurvePublicKey.from_encoded_point
 self.curve, Q_S_bytes
   /usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:109: 
CryptographyDeprecationWarning: encode_point has been deprecated on 
EllipticCurvePublicNumbers and will be removed in a future version. Please use 
EllipticCurvePublicKey.public_bytes to obtain both compressed and uncompressed 
point encoding.
 hm.add_string(self.Q_C.public_numbers().encode_point())
   [INFO:2020-05-16 14:54:41,606]: RunnerClient: 
kafkatest.tests.client.quota_test.QuotaTest.test_quota.consumer_num=2.quota_type=client-id:
 Running...
   [INFO:2020-05-16 14:57:18,680]: RunnerClient: 
kafkatest.tests.client.quota_test.QuotaTest.test_quota.consumer_num=2.quota_type=client-id:
 PASS
   [INFO:2020-05-16 14:57:18,681]: RunnerClient: 
kafkatest.tests.client.quota_test.QuotaTest.test_quota.consumer_num=2.quota_type=client-id:
 Tearing down...
   [WARNING - 2020-05-16 14:57:18,981 - service_registry - stop_all - 
lineno:55]: Error stopping service 
: 1
   [INFO:2020-05-16 14:57:25,809]: RunnerClient: 
kafkatest.tests.client.quota_test.QuotaTest.test_quota.consumer_num=2.quota_type=client-id:
 Summary: 
   [INFO:2020-05-16 14:57:25,810]: RunnerClient: 
kafkatest.tests.client.quota_test.QuotaTest.test_quota.consumer_num=2.quota_type=client-id:
 Data: None
   [INFO:2020-05-16 14:57:25,866]: 
~
   [INFO:2020-05-16 14:57:25,867]: Triggering test 2 of 9...
   [INFO:2020-05-16 14:57:25,871]: RunnerClient: Loading test {'directory': 
'/opt/kafka-dev/tests/kafkatest/tests/client', 'file_name': 'quota_test.py', 
'method_name': 'test_quota', 'cls_name': 'QuotaTest', 'injected_args': 
{'old_broker_throttling_behavior': True, 'quota_type': 'client-id'}}
   [INFO:2020-05-16 14:57:25,873]: RunnerClient: 
kafkatest.tests.client.quota_test.QuotaTest.test_quota.old_broker_throttling_behavior=True.quota_type=client-id:
 Setting up...
   /usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:39: 
CryptographyDeprecationWarning: encode_point has been deprecated on 
EllipticCurvePublicNumbers and will be removed in a future version. Please use 
EllipticCurvePublicKey.public_bytes to obtain both compressed and uncompressed 
point encoding.
 m.add_string(self.Q_C.public_numbers().encode_point())
   /usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:94: 
CryptographyDeprecationWarning: Support for unsafe construction of public 
numbers from encoded data will be removed in a future version. Please use 
EllipticCurvePublicKey.from_encoded_point
 self.curve, Q_S_bytes
   /usr/local/lib/python2.7/dist-packages/paramiko/kex_ecdh_nist.py:109: 
CryptographyDeprecationWarning: encode_point has been deprecated on 
EllipticCurvePublicNumbers and will be removed in a future version. Please use 
EllipticCurvePublicKey.public_bytes to obtain both compressed and uncompressed 
point encoding.
 hm.add_string(self.Q_C.public_numbers().encode_point())
   [INFO:2020-05-16 14:57:27,557]: RunnerClient: 
kafkatest.tests.client.quota_test.QuotaTest.test_quota.old_broker_throttling_behavior=True.quota_type=client-id:
 Running...
   [INFO:2020-05-16 14:59:58,427]: RunnerClient: 
kafkatest.tests.client.quota_test.QuotaTest.test_quota.old_broker_throttling_behavior=True.quota_type=client-id:
 PASS
   [INFO:2020-05-16 14:59:58,42

[GitHub] [kafka] bdbyrne commented on pull request #8658: KAFKA-9980: Fix client quotas default entity name handling in broker.

2020-05-16 Thread GitBox


bdbyrne commented on pull request #8658:
URL: https://github.com/apache/kafka/pull/8658#issuecomment-629714866


   @cmccabe this is ready for review. Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10000) Atomic commit of source connector records and offsets

2020-05-16 Thread Chris Egerton (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-1?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chris Egerton reassigned KAFKA-1:
-

Assignee: Chris Egerton

> Atomic commit of source connector records and offsets
> -
>
> Key: KAFKA-1
> URL: https://issues.apache.org/jira/browse/KAFKA-1
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Reporter: Chris Egerton
>Assignee: Chris Egerton
>Priority: Major
>  Labels: needs-kip
>
> It'd be nice to be able to configure source connectors such that their 
> offsets are committed if and only if all records up to that point have been 
> ack'd by the producer. This would go a long way towards EOS for source 
> connectors.
>  
> This differs from https://issues.apache.org/jira/browse/KAFKA-6079, which is 
> marked as {{WONTFIX}} since it only concerns enabling the idempotent producer 
> for source connectors and is not concerned with source connector offsets.
> This also differs from https://issues.apache.org/jira/browse/KAFKA-6080, 
> which had a lot of discussion around allowing connector-defined transaction 
> boundaries. The suggestion in this ticket is to only use source connector 
> offset commits as the transaction boundaries for connectors; allowing 
> connector-specified transaction boundaries can be addressed separately.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma opened a new pull request #8678: Update Gradle to 6.4.1

2020-05-16 Thread GitBox


ijuma opened a new pull request #8678:
URL: https://github.com/apache/kafka/pull/8678


   This fixes critical bugs in Gradle 6.4:
   
   * Regression: Different daemons are used between IDE and CLI builds for the 
same project
   * Regression: Main-Class attribute always added to jar manifest when using 
application plugin
   * Fix potential NPE if code is executed concurrently
   
   More details: https://github.com/gradle/gradle/releases/tag/v6.4.1
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-05-16 Thread GitBox


ijuma commented on pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#issuecomment-629731366


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-16 Thread GitBox


mjsax opened a new pull request #8679:
URL: https://github.com/apache/kafka/pull/8679


- part of KIP-221



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-16 Thread GitBox


mjsax commented on pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#issuecomment-629734751


   Call for review @lkokhreidze @vvcephei 
   
   Also updates the Scala API...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-16 Thread GitBox


mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211270



##
File path: docs/streams/developer-guide/dsl-api.html
##
@@ -3679,58 +3670,6 @@ KTable-KTable 
Foreign-Key
 // Write the stream to the output topic, using explicit key 
and value serdes,
 // (thus overriding the defaults in the config 
properties).
 stream.to("my-stream-output-topic", 
Produced.with(Serdes.String(), Serdes.Long());
-
-
-Causes data re-partitioning if any of the 
following conditions is true:
-
-If the output topic has a different number of 
partitions than the stream/table.
-If the KStream was marked for re-partitioning.
-If you provide a custom StreamPartitioner to explicitly 
control how to distribute the output records
-across the partitions of the output topic.
-If the key of an output record is null.
-
-
-
-Through

Review comment:
   The diff is weird because the part above repeats below. The actual 
deletes starts here.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-16 Thread GitBox


mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211408



##
File path: streams/src/main/java/org/apache/kafka/streams/KafkaStreams.java
##
@@ -1082,7 +1081,7 @@ public void cleanUp() {
  * This will use the default Kafka Streams partitioner to locate the 
partition.
  * If a {@link StreamPartitioner custom partitioner} has been
  * {@link ProducerConfig#PARTITIONER_CLASS_CONFIG configured} via {@link 
StreamsConfig} or
- * {@link KStream#through(String, Produced)}, or if the original {@link 
KTable}'s input
+ * {@link KStream#repartition(Repartitioned)}, or if the original {@link 
KTable}'s input

Review comment:
   Not sure if this update is necessary. This method is deprecated itself.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-16 Thread GitBox


mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211425



##
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##
@@ -846,16 +847,13 @@
  * from the auto-generated topic using default serializers, deserializers, 
and producer's {@link DefaultPartitioner}.
  * The number of partitions is determined based on the upstream topics 
partition numbers.
  * 
- * This operation is similar to {@link #through(String)}, however, Kafka 
Streams manages the used topic automatically.

Review comment:
   Not 100% sure if we should remove this now, of when we remove 
`through()`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-16 Thread GitBox


mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211457



##
File path: streams/src/main/java/org/apache/kafka/streams/kstream/KStream.java
##
@@ -925,9 +920,8 @@ void to(final TopicNameExtractor topicExtractor,
  * Convert this stream to a {@link KTable}.
  * 
  * If a key changing operator was used before this operation (e.g., {@link 
#selectKey(KeyValueMapper)},
- * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)}, or
- * {@link #transform(TransformerSupplier, String...)}), and no data 
redistribution happened afterwards (e.g., via
- * {@link #through(String)}) an internal repartitioning topic will be 
created in Kafka.
+ * {@link #map(KeyValueMapper)}, {@link #flatMap(KeyValueMapper)} or
+ * {@link #transform(TransformerSupplier, String...)}) an internal 
repartitioning topic will be created in Kafka.

Review comment:
   Just simplifying this one.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-16 Thread GitBox


mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211526



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/RepartitionedInternal.java
##
@@ -21,33 +21,33 @@
 import org.apache.kafka.streams.processor.StreamPartitioner;
 import org.apache.kafka.streams.processor.internals.InternalTopicProperties;
 
-class RepartitionedInternal extends Repartitioned {
+public class RepartitionedInternal extends Repartitioned {

Review comment:
   Must be public to be visible in Scala





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-16 Thread GitBox


mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211555



##
File path: 
streams/src/test/java/org/apache/kafka/streams/StreamsBuilderTest.java
##
@@ -285,7 +286,28 @@ public void shouldProcessViaThroughTopic() {
 assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", 
"aa", 0)), sourceProcessorSupplier.theCapturedProcessor().processed);
 assertEquals(Collections.singletonList(new KeyValueTimestamp<>("A", 
"aa", 0)), throughProcessorSupplier.theCapturedProcessor().processed);
 }
-
+
+@Test
+public void shouldProcessViaRepartitionTopic() {

Review comment:
   Replicated the test for `through()` for `repartition()`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-16 Thread GitBox


mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211624



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/AbstractResetIntegrationTest.java
##
@@ -484,8 +493,14 @@ private Topology 
setupTopologyWithIntermediateUserTopic(final String outputTopic
 .toStream()
 .to(OUTPUT_TOPIC, Produced.with(Serdes.Long(), Serdes.Long()));
 
-input.through(INTERMEDIATE_USER_TOPIC)
-.groupByKey()
+final KStream stream;
+if (useRepartitioned) {
+stream = input.repartition();
+} else {
+input.to(INTERMEDIATE_USER_TOPIC);
+stream = builder.stream(INTERMEDIATE_USER_TOPIC);

Review comment:
   We still need to test this, because topics using this pattern are still 
consider _intermediate_ topics and the `--intermediat-topic` flag in 
`StreamsResetter` is still useful and not changed.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-16 Thread GitBox


mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211685



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/EosIntegrationTest.java
##
@@ -213,7 +213,8 @@ private void runSimpleCopyTest(final int numberOfRestarts,
 final KStream input = builder.stream(inputTopic);
 KStream output = input;
 if (throughTopic != null) {
-output = input.through(throughTopic);
+input.to(throughTopic);
+output = builder.stream(throughTopic);

Review comment:
   Using `to()` and `steam()` is "simpler" as we cleanup topics in-between 
(and thus avoid internal topics).
   
   We could of course also use `repartition()`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-16 Thread GitBox


mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211688



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/ResetIntegrationWithSslTest.java
##
@@ -42,7 +42,7 @@
 
 private static final String TEST_ID = "reset-with-ssl-integration-test";
 
-private static Map sslConfig;
+private static final Map SSL_CONFIG;

Review comment:
   side cleanup

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
##
@@ -588,6 +592,14 @@ public void shouldNotAllowNullTopicOnTo() {
 assertThat(exception.getMessage(), equalTo("topic can't be null"));
 }
 
+@Test
+public void shouldNotAllowNullRepartitionedOnRepartition() {

Review comment:
   replicating test





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-16 Thread GitBox


mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211787



##
File path: 
streams/src/test/java/org/apache/kafka/streams/tests/EosTestClient.java
##
@@ -132,7 +132,8 @@ private KafkaStreams createKafkaStreams(final Properties 
props) {
 .to("sum", Produced.with(stringSerde, longSerde));
 
 if (withRepartitioning) {
-final KStream repartitionedData = 
data.through("repartition");
+data.to("repartition");
+final KStream repartitionedData = 
builder.stream("repartition");

Review comment:
   As above. Avoid internal topics.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-16 Thread GitBox


mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211719



##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
##
@@ -1393,6 +1409,11 @@ public void shouldPreserveSerdesForOperators() {
 assertEquals(((AbstractStream) stream1.through("topic-3", 
Produced.with(mySerde, mySerde))).keySerde(), mySerde);
 assertEquals(((AbstractStream) stream1.through("topic-3", 
Produced.with(mySerde, mySerde))).valueSerde(), mySerde);
 
+assertEquals(((AbstractStream) stream1.repartition()).keySerde(), 
consumedInternal.keySerde());
+assertEquals(((AbstractStream) stream1.repartition()).valueSerde(), 
consumedInternal.valueSerde());
+assertEquals(((AbstractStream) 
stream1.repartition(Repartitioned.with(mySerde, mySerde))).keySerde(), mySerde);
+assertEquals(((AbstractStream) 
stream1.repartition(Repartitioned.with(mySerde, mySerde))).valueSerde(), 
mySerde);

Review comment:
   replicating test cases

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
##
@@ -1452,6 +1474,24 @@ public void 
shouldUseRecordMetadataTimestampExtractorWithThrough() {
 
assertNull(processorTopology.source("topic-1").getTimestampExtractor());
 }
 
+@Test
+public void shouldUseRecordMetadataTimestampExtractorWithRepartition() {

Review comment:
   replicating test

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamImplTest.java
##
@@ -1467,6 +1507,21 @@ public void shouldSendDataThroughTopicUsingProduced() {
 assertThat(processorSupplier.theCapturedProcessor().processed, 
equalTo(Collections.singletonList(new KeyValueTimestamp<>("a", "b", 0;
 }
 
+@Test
+public void shouldSendDataThroughRepartitionTopicUsingRepartitioned() {

Review comment:
   replicating test





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-16 Thread GitBox


mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426211984



##
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/KStream.scala
##
@@ -218,7 +218,7 @@ class KStream[K, V](val inner: KStreamJ[K, V]) {
* import Serdes._
*
* //..
-   * val clicksPerRegion: KTable[String, Long] = //..
+   * val clicksPerRegion: KStream[String, Long] = //..

Review comment:
   There is no `KTable#through()` method.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-16 Thread GitBox


mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426212066



##
File path: 
streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/ProducedTest.scala
##
@@ -37,15 +37,15 @@ class ProducedTest extends FlatSpec with Matchers {
 internalProduced.valueSerde.getClass shouldBe Serdes.Long.getClass
   }
 
-  "Create a Produced with timestampExtractor and resetPolicy" should "create a 
Consumed with Serdes, timestampExtractor and resetPolicy" in {
+  "Create a Produced with streamPartitioner" should "create a Produced with 
Serdes and streamPartitioner" in {

Review comment:
   Side cleanup (was originally copied from `ConsumedTest` but not updated 
correctly)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8679: KAFKA-10003: Mark KStream.through() as deprecated

2020-05-16 Thread GitBox


mjsax commented on a change in pull request #8679:
URL: https://github.com/apache/kafka/pull/8679#discussion_r426214228



##
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/kstream/Repartitioned.scala
##
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.kstream
+
+import org.apache.kafka.common.serialization.Serde
+import org.apache.kafka.streams.kstream.{Repartitioned => RepartitionedJ}
+import org.apache.kafka.streams.processor.StreamPartitioner
+
+object Repartitioned {
+
+  /**
+   * Create a Repartitioned instance with provided keySerde and valueSerde.
+   *
+   * @tparam K key type
+   * @tparam V value type
+   * @param keySerdeSerde to use for serializing the key
+   * @param valueSerde  Serde to use for serializing the value
+   * @return A new [[Repartitioned]] instance configured with keySerde and 
valueSerde
+   * @see KStream#repartition(Repartitioned)
+   */
+  def `with`[K, V](implicit keySerde: Serde[K], valueSerde: Serde[V]): 
RepartitionedJ[K, V] =

Review comment:
   I just named all method `with` in alignment to the other Scala helper 
classes.
   
   Also noticed, that all helper classed only have static methods... Is not by 
design? Seems we are missing something here? If there is more than one optional 
parameter, it seems we should have non-static method to allow method chaining? 
(Could be fixed in a follow up PR)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8678: Update Gradle to 6.4.1

2020-05-16 Thread GitBox


ijuma commented on pull request #8678:
URL: https://github.com/apache/kafka/pull/8678#issuecomment-629745862


   Unrelated Streams failures like:
   
   > 
org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[false]
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8517: MINOR: use monotonic clock for replica fetcher DelayedItem

2020-05-16 Thread GitBox


ijuma commented on pull request #8517:
URL: https://github.com/apache/kafka/pull/8517#issuecomment-629745896


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org