[GitHub] [kafka] LMnet commented on a change in pull request #8955: KAFKA-10020: Create a new version of a scala Serdes without name clash (KIP-616)

2020-08-30 Thread GitBox


LMnet commented on a change in pull request #8955:
URL: https://github.com/apache/kafka/pull/8955#discussion_r479877121



##
File path: 
streams/streams-scala/src/main/scala/org/apache/kafka/streams/scala/serialization/Serdes.scala
##
@@ -0,0 +1,93 @@
+/*
+ * Copyright (C) 2018 Lightbend Inc. 
+ * Copyright (C) 2017-2018 Alexis Seigneurin.
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.scala.serialization
+
+import java.nio.ByteBuffer
+import java.util
+import java.util.UUID
+
+import org.apache.kafka.common.serialization.{Deserializer, Serde, Serializer, 
Serdes => JSerdes}
+import org.apache.kafka.streams.kstream.WindowedSerdes
+
+object Serdes extends LowPrioritySerdes {
+  implicit def stringSerde: Serde[String] = JSerdes.String()
+  implicit def longSerde: Serde[Long] = 
JSerdes.Long().asInstanceOf[Serde[Long]]
+  implicit def javaLongSerde: Serde[java.lang.Long] = JSerdes.Long()
+  implicit def byteArraySerde: Serde[Array[Byte]] = JSerdes.ByteArray()
+  implicit def bytesSerde: Serde[org.apache.kafka.common.utils.Bytes] = 
JSerdes.Bytes()
+  implicit def byteBufferSerde: Serde[ByteBuffer] = JSerdes.ByteBuffer()
+  implicit def shortSerde: Serde[Short] = 
JSerdes.Short().asInstanceOf[Serde[Short]]
+  implicit def javaShortSerde: Serde[java.lang.Short] = JSerdes.Short()
+  implicit def floatSerde: Serde[Float] = 
JSerdes.Float().asInstanceOf[Serde[Float]]
+  implicit def javaFloatSerde: Serde[java.lang.Float] = JSerdes.Float()
+  implicit def doubleSerde: Serde[Double] = 
JSerdes.Double().asInstanceOf[Serde[Double]]
+  implicit def javaDoubleSerde: Serde[java.lang.Double] = JSerdes.Double()
+  implicit def intSerde: Serde[Int] = 
JSerdes.Integer().asInstanceOf[Serde[Int]]
+  implicit def javaIntegerSerde: Serde[java.lang.Integer] = JSerdes.Integer()
+  implicit def uuidSerde: Serde[UUID] = JSerdes.UUID()
+
+  implicit def timeWindowedSerde[T](implicit tSerde: Serde[T]): 
WindowedSerdes.TimeWindowedSerde[T] =
+new WindowedSerdes.TimeWindowedSerde[T](tSerde)

Review comment:
   Ok, I agree that the third option is a bit unnecessary. We already have 
a constructor for that.
   
   I deleted `WindowedSerdes` at all and mentioned them in the deprecated 
message.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10448) Preserve Source Partition in Kafka Streams from context

2020-08-30 Thread satya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10448?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

satya updated KAFKA-10448:
--
Description: Currently Kafka streams Sink Nodes use default partitioner or 
has the provision of using a custom partitioner which has to be dependent on 
key/value. I am looking for an enhancement of Sink Node to ensure source 
partition is preserved instead of deriving the partition again using key/value. 
One of our use case has producers which have custom partitioners that we dont 
have access to as it is a third-party application. By simply preserving the 
partition through context.partition() would be helpful.  (was: Currently Kafka 
streams Sink Nodes use default partitioner or has the provision of using a 
custom partitioner which has to be dependent on key/value. I am looking for an 
enhancement of Sink Node to ensure source partition is preserved instead of 
deriving the partition again using key/value. One of our use case has producers 
which have customer partitioners that we dont have access to as it is a 
third-party application. By simply preserving the partition through 
context.partition() would be helpful.)

> Preserve Source Partition in Kafka Streams from context
> ---
>
> Key: KAFKA-10448
> URL: https://issues.apache.org/jira/browse/KAFKA-10448
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
>Reporter: satya
>Priority: Critical
>
> Currently Kafka streams Sink Nodes use default partitioner or has the 
> provision of using a custom partitioner which has to be dependent on 
> key/value. I am looking for an enhancement of Sink Node to ensure source 
> partition is preserved instead of deriving the partition again using 
> key/value. One of our use case has producers which have custom partitioners 
> that we dont have access to as it is a third-party application. By simply 
> preserving the partition through context.partition() would be helpful.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10449) Connect-distributed sample configuration file does not have instructions for listeners

2020-08-30 Thread tinawenqiao (Jira)
tinawenqiao created KAFKA-10449:
---

 Summary: Connect-distributed sample configuration file does not 
have instructions for listeners
 Key: KAFKA-10449
 URL: https://issues.apache.org/jira/browse/KAFKA-10449
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 2.6.0
Reporter: tinawenqiao


In WokerConfig.java we found that REST_HOST_NAME_CONFIG(rest.host.name) and 
REST_PORT_CONFIG(rest.port) were deprecated. And some new configuration 
parameters are introduced such as LISTENERS_CONFIG(listeners), 
REST_ADVERTISED_LISTENER_CONFIG(rest.advertised.listener),ADMIN_LISTENERS_CONFIG(admin.listeners)
 but not list in the sample conf file.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10448) Preserve Source Partition in Kafka Streams from context

2020-08-30 Thread satya (Jira)
satya created KAFKA-10448:
-

 Summary: Preserve Source Partition in Kafka Streams from context
 Key: KAFKA-10448
 URL: https://issues.apache.org/jira/browse/KAFKA-10448
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Affects Versions: 2.5.0
Reporter: satya


Currently Kafka streams Sink Nodes use default partitioner or has the provision 
of using a custom partitioner which has to be dependent on key/value. I am 
looking for an enhancement of Sink Node to ensure source partition is preserved 
instead of deriving the partition again using key/value. One of our use case 
has producers which have customer partitioners that we dont have access to as 
it is a third-party application. By simply preserving the partition through 
context.partition() would be helpful.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] soarez commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers

2020-08-30 Thread GitBox


soarez commented on pull request #9000:
URL: https://github.com/apache/kafka/pull/9000#issuecomment-683488002


   Squashed and rebased as there were conflicts. Please take another look 
@mjsax .



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soarez commented on a change in pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers

2020-08-30 Thread GitBox


soarez commented on a change in pull request #9000:
URL: https://github.com/apache/kafka/pull/9000#discussion_r479832011



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamUtil.java
##
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.kstream.internals;
+
+import org.apache.kafka.streams.kstream.TransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerSupplier;
+import org.apache.kafka.streams.kstream.ValueTransformerWithKeySupplier;
+
+/**
+ * Shared functions to handle verifications of a valid {@link 
org.apache.kafka.streams.kstream.KStream}.
+ */
+final class KStreamUtil {
+
+private KStreamUtil() {}
+
+/**
+ * @throws IllegalArgumentException if the same transformer instance is 
obtained each time
+ */
+static void checkSupplier(final TransformerSupplier supplier) {

Review comment:
   Good idea. `supplier.getClass().getName()` doesn't really work, but we 
get the name by looking through `supplier.getClass().getInterfaces()`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-30 Thread GitBox


ijuma commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479829275



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -68,6 +68,13 @@ object LogAppendInfo {
   offsetsMonotonic = false, -1L, recordErrors, errorMessage)
 }
 
+sealed trait LeaderHWChange
+object LeaderHWChange {
+  case object Incremental extends LeaderHWChange

Review comment:
   Another nit: `LeaderHwChange` adheres to the coding convention better.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma opened a new pull request #9231: KAFKA-10447: Migrate tools module to JUnit 5

2020-08-30 Thread GitBox


ijuma opened a new pull request #9231:
URL: https://github.com/apache/kafka/pull/9231


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10447) Migrate tools module to JUnit 5

2020-08-30 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10447?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-10447:

Summary: Migrate tools module to JUnit 5  (was: Migrate tools module from 
JUnit 4 to JUnit 5)

> Migrate tools module to JUnit 5
> ---
>
> Key: KAFKA-10447
> URL: https://issues.apache.org/jira/browse/KAFKA-10447
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10447) Migrate tools module from JUnit 4 to JUnit 5

2020-08-30 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-10447:
---

 Summary: Migrate tools module from JUnit 4 to JUnit 5
 Key: KAFKA-10447
 URL: https://issues.apache.org/jira/browse/KAFKA-10447
 Project: Kafka
  Issue Type: Sub-task
Reporter: Ismael Juma
Assignee: Ismael Juma






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10417) suppress() with cogroup() throws ClassCastException

2020-08-30 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17187322#comment-17187322
 ] 

Rens Groothuijsen commented on KAFKA-10417:
---

A cogrouped stream creates a KTable with processor supplier type PassThrough, 
which implements ProcessorSupplier rather than  KTableProcessorSupplier. This 
then causes a problem 
[here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842].

> suppress() with cogroup() throws ClassCastException
> ---
>
> Key: KAFKA-10417
> URL: https://issues.apache.org/jira/browse/KAFKA-10417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Wardha Perinkada Kattu
>Priority: Blocker
>  Labels: kafka-streams
> Fix For: 2.7.0
>
>
> Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` 
> throws `ClassCastException`
> Works fine without the `suppress()`
> Code block tested -
> {code:java}
> val stream1 = requestStreams.merge(successStreams).merge(errorStreams)
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.notificationSerde()))
> val streams2 = confirmationStreams
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.confirmationsSerde()))
> val cogrouped = 
> stream1.cogroup(notificationAggregator).cogroup(streams2, 
> confirmationsAggregator)
> 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong(
> .aggregate({ null }, Materialized.`as` NotificationMetric, WindowStore ByteArray>>("time-windowed-aggregated-stream-store")
> 
> .withValueSerde(serdesConfig.notificationMetricSerde()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()))
> .toStream()
> {code}
> Exception thrown is:
> {code:java}
> Caused by: java.lang.ClassCastException: class 
> org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to 
> class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier 
> (org.apache.kafka.streams.kstream.internals.PassThrough and 
> org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in 
> unnamed module of loader 'app')
> {code}
> [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-30 Thread GitBox


junrao commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479802860



##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -85,6 +92,9 @@ object LogAppendInfo {
  * @param validBytes The number of valid bytes
  * @param offsetsMonotonic Are the offsets in this message set monotonically 
increasing
  * @param lastOffsetOfFirstBatch The last offset of the first batch
+ * @param leaderHWChange Incremental if the high watermark need to be 
increased after appending record.

Review comment:
   need => needs

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -68,6 +68,13 @@ object LogAppendInfo {
   offsetsMonotonic = false, -1L, recordErrors, errorMessage)
 }
 
+sealed trait LeaderHWChange
+object LeaderHWChange {
+  case object Incremental extends LeaderHWChange

Review comment:
   Probably Increased is clearer than Incremental.

##
File path: core/src/main/scala/kafka/log/Log.scala
##
@@ -85,6 +92,9 @@ object LogAppendInfo {
  * @param validBytes The number of valid bytes
  * @param offsetsMonotonic Are the offsets in this message set monotonically 
increasing
  * @param lastOffsetOfFirstBatch The last offset of the first batch
+ * @param leaderHWChange Incremental if the high watermark need to be 
increased after appending record.
+ *   Same if high watermark is not changed. None is the 
default value and it means append is failed

Review comment:
   is failed => failed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #9062: KAFKA-8098: fix the flaky test by disabling the auto commit to avoid member rejoining

2020-08-30 Thread GitBox


omkreddy commented on pull request #9062:
URL: https://github.com/apache/kafka/pull/9062#issuecomment-683453793


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rayokota commented on pull request #5068: KAFKA-6886 Externalize secrets from Connect configs

2020-08-30 Thread GitBox


rayokota commented on pull request #5068:
URL: https://github.com/apache/kafka/pull/5068#issuecomment-683443295


   @aakashgupta96 yes it was merged



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 opened a new pull request #9230: KAFKA-10446 add lz4 and zstd to compression type of benchmark_test.py

2020-08-30 Thread GitBox


chia7712 opened a new pull request #9230:
URL: https://github.com/apache/kafka/pull/9230


   issue: https://issues.apache.org/jira/browse/KAFKA-10446
   
   Both "lz4" and "zstd" are popular and important compressions supported by 
kafka. They are worth being benchmark.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10446) add "lz4" and "zstd" to compression type of benchmark_test.py

2020-08-30 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10446?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai updated KAFKA-10446:
---
Labels: benchmark benchmarks  (was: )

> add "lz4" and "zstd" to compression type of benchmark_test.py
> -
>
> Key: KAFKA-10446
> URL: https://issues.apache.org/jira/browse/KAFKA-10446
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Chia-Ping Tsai
>Priority: Minor
>  Labels: benchmark, benchmarks
>
> Both "lz4" and "zstd" are popular and important compressions supported by 
> kafka. They are worth being benchmark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10446) add "lz4" and "zstd" to compression type of benchmark_test.py

2020-08-30 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-10446:
--

 Summary: add "lz4" and "zstd" to compression type of 
benchmark_test.py
 Key: KAFKA-10446
 URL: https://issues.apache.org/jira/browse/KAFKA-10446
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


Both "lz4" and "zstd" are popular and important compressions supported by 
kafka. They are worth being benchmark.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-30 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-683434648


   > Could you do some perf tests so that we know the performance of fetch 
requests doesn't change noticeably?
   
   the result of ```Benchmark.test_producer_and_consumer``` is attached below. 
It seems the patch gets better throughput :)
   
   I will run more performance tests tomorrow.
   
   **BEFORE**
   ```
   test_id:
kafkatest.benchmarks.core.benchmark_test.Benchmark.test_producer_and_consumer.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.2.security_protocol=SSL.compression_type=none
   status: PASS
   run time:   1 minute 8.041 seconds
   {"consumer": {"records_per_sec": 570060.4264, "mb_per_sec": 54.3652}, 
"producer": {"records_per_sec": 611583.389395, "mb_per_sec": 58.33}}
   

   test_id:
kafkatest.benchmarks.core.benchmark_test.Benchmark.test_producer_and_consumer.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.2.security_protocol=SSL.compression_type=snappy
   status: PASS
   run time:   58.549 seconds
   {"consumer": {"records_per_sec": 1331203.4079, "mb_per_sec": 126.9535}, 
"producer": {"records_per_sec": 1257387.149503, "mb_per_sec": 119.91}}
   

   test_id:
kafkatest.benchmarks.core.benchmark_test.Benchmark.test_producer_and_consumer.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.3.security_protocol=SSL.compression_type=none
   status: PASS
   run time:   1 minute 7.757 seconds
   {"consumer": {"records_per_sec": 577133.9528, "mb_per_sec": 55.0398}, 
"producer": {"records_per_sec": 582106.059724, "mb_per_sec": 55.51}}
   

   test_id:
kafkatest.benchmarks.core.benchmark_test.Benchmark.test_producer_and_consumer.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.3.security_protocol=SSL.compression_type=snappy
   status: PASS
   run time:   59.287 seconds
   {"consumer": {"records_per_sec": 1421868.335, "mb_per_sec": 135.5999}, 
"producer": {"records_per_sec": 1332267.519318, "mb_per_sec": 127.05}}
   

   test_id:
kafkatest.benchmarks.core.benchmark_test.Benchmark.test_producer_and_consumer.security_protocol=PLAINTEXT.compression_type=none
   status: PASS
   run time:   1 minute 1.819 seconds
   {"consumer": {"records_per_sec": 987361.7694, "mb_per_sec": 94.1622}, 
"producer": {"records_per_sec": 942862.530643, "mb_per_sec": 89.92}}
   

   test_id:
kafkatest.benchmarks.core.benchmark_test.Benchmark.test_producer_and_consumer.security_protocol=PLAINTEXT.compression_type=snappy
   status: PASS
   run time:   55.877 seconds
   {"consumer": {"records_per_sec": 1479508.8031, "mb_per_sec": 141.097}, 
"producer": {"records_per_sec": 1339046.598822, "mb_per_sec": 127.7}}
   

   ```
   
   
   **AFTER**
   ```
   test_id:
kafkatest.benchmarks.core.benchmark_test.Benchmark.test_producer_and_consumer.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.2.security_protocol=SSL.compression_type=none
   status: PASS
   run time:   1 minute 9.619 seconds
   {"consumer": {"records_per_sec": 615422.4875, "mb_per_sec": 58.6913}, 
"producer": {"records_per_sec": 640779.187492, "mb_per_sec": 61.11}}
   

   test_id:
kafkatest.benchmarks.core.benchmark_test.Benchmark.test_producer_and_consumer.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.2.security_protocol=SSL.compression_type=snappy
   status: PASS
   run time:   57.667 seconds
   {"consumer": {"records_per_sec": 1444043.3213, "mb_per_sec": 137.7147}, 
"producer": {"records_per_sec": 1398014.818957, "mb_per_sec": 133.33}}
   

   test_id:
kafkatest.benchmarks.core.benchmark_test.Benchmark.test_producer_and_consumer.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.3.security_protocol=SSL.compression_type=none
   status: PASS
   run time:   1 minute 6.802 seconds
   {"consumer": {"records_per_sec": 619770.6848, "mb_per_sec": 59.1059}, 
"producer": {"records_per_sec": 646203.55412, "mb_per_sec": 61.63}}
   

   test_id:
kafkatest.benchmarks.core.benchmark_test.Benchmark.test_producer_and_consumer.interbroker_security_protocol=PLAINTEXT.tls_version=TLSv1.3.security_protocol=SSL.compression_type=snappy
   status: PASS
   run time:   1 minute 0.790 seconds
   {"consumer": {"records_per_sec": 

[GitHub] [kafka] aakashgupta96 commented on pull request #5068: KAFKA-6886 Externalize secrets from Connect configs

2020-08-30 Thread GitBox


aakashgupta96 commented on pull request #5068:
URL: https://github.com/apache/kafka/pull/5068#issuecomment-683433765


   Hi @rayokota 
   Was this feature merged? I was also planning to submit a KIP around same 
then I saw your proposal. 
   Can you please update whether it is merged or are you still working on this?
   
   If not, I can plan to submit a proposal for same.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9223: KAFKA-10438 Lazy initialization of record header to reduce memory usa…

2020-08-30 Thread GitBox


chia7712 commented on pull request #9223:
URL: https://github.com/apache/kafka/pull/9223#issuecomment-683425417


   > Any thoughts on test for this?
   
   The JMH is attached.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soarez commented on pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies

2020-08-30 Thread GitBox


soarez commented on pull request #9064:
URL: https://github.com/apache/kafka/pull/9064#issuecomment-683421515


   @mjsax rebased and fix the error, now `./gradlew  :streams:unitTest` seems 
to run without errors over here.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soarez edited a comment on pull request #9064: KAFKA-10205: Documentation and handling of non deterministic Topologies

2020-08-30 Thread GitBox


soarez edited a comment on pull request #9064:
URL: https://github.com/apache/kafka/pull/9064#issuecomment-683421515


   @mjsax rebased and fixed the error, now `./gradlew  :streams:unitTest` seems 
to run without errors over here.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9223: KAFKA-10438 Lazy initialization of record header to reduce memory usa…

2020-08-30 Thread GitBox


chia7712 commented on a change in pull request #9223:
URL: https://github.com/apache/kafka/pull/9223#discussion_r479750019



##
File path: 
clients/src/test/java/org/apache/kafka/common/record/DefaultRecordTest.java
##
@@ -270,7 +270,7 @@ public void testInvalidNumHeadersPartial() throws 
IOException {
 DefaultRecord.readPartiallyFrom(inputStream, skipArray, 0L, 0L, 
RecordBatch.NO_SEQUENCE, null);
 }
 
-@Test(expected = StringIndexOutOfBoundsException.class)
+@Test(expected = InvalidRecordException.class)

Review comment:
   The exception is caused by ```headerKeyBuffer.limit(headerKeySize);```. 
The new limit is larger than capacity





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-30 Thread GitBox


chia7712 commented on pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#issuecomment-683400677


   > Could you do some perf tests so that we know the performance of fetch 
requests doesn't change noticeably?
   
   @junrao Are there any suggested official benchmark tools?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9229: MINOR: Reduce allocations in requests via buffer caching

2020-08-30 Thread GitBox


chia7712 commented on pull request #9229:
URL: https://github.com/apache/kafka/pull/9229#issuecomment-683400436


   @ijuma I have closed #9220 and assign 
https://issues.apache.org/jira/browse/KAFKA-10433 to you.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 closed pull request #9220: KAFKA-10433 Reuse the ByteBuffer in validating compressed records

2020-08-30 Thread GitBox


chia7712 closed pull request #9220:
URL: https://github.com/apache/kafka/pull/9220


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9220: KAFKA-10433 Reuse the ByteBuffer in validating compressed records

2020-08-30 Thread GitBox


chia7712 commented on pull request #9220:
URL: https://github.com/apache/kafka/pull/9220#issuecomment-683400353


   close as there is a better approach (#9229)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10433) Reuse the ByteBuffer in validating compressed records

2020-08-30 Thread Chia-Ping Tsai (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10433?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Chia-Ping Tsai reassigned KAFKA-10433:
--

Assignee: Ismael Juma  (was: Chia-Ping Tsai)

> Reuse the ByteBuffer in validating compressed records 
> --
>
> Key: KAFKA-10433
> URL: https://issues.apache.org/jira/browse/KAFKA-10433
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Chia-Ping Tsai
>Assignee: Ismael Juma
>Priority: Major
>
> {code:java}
> for (batch <- batches) {
>   validateBatch(topicPartition, firstBatch, batch, origin, toMagic, 
> brokerTopicStats)
>   uncompressedSizeInBytes += 
> AbstractRecords.recordBatchHeaderSizeInBytes(toMagic, batch.compressionType())
>   val recordsIterator = if (inPlaceAssignment && firstBatch.magic >= 
> RecordBatch.MAGIC_VALUE_V2)
> batch.skipKeyValueIterator(BufferSupplier.NO_CACHING)
>   else
> batch.streamingIterator(BufferSupplier.NO_CACHING)
> {code}
> It is hot method so reusing the ByteBuffer can reduce a bunch of memory usage 
> if the compression type supports BufferSupplier.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] chia7712 commented on a change in pull request #8657: KAFKA-8334 Make sure the thread which tries to complete delayed reque…

2020-08-30 Thread GitBox


chia7712 commented on a change in pull request #8657:
URL: https://github.com/apache/kafka/pull/8657#discussion_r479737358



##
File path: core/src/main/scala/kafka/server/ActionQueue.scala
##
@@ -0,0 +1,46 @@
+/**
+  * Licensed to the Apache Software Foundation (ASF) under one or more
+  * contributor license agreements.  See the NOTICE file distributed with
+  * this work for additional information regarding copyright ownership.
+  * The ASF licenses this file to You under the Apache License, Version 2.0
+  * (the "License"); you may not use this file except in compliance with
+  * the License.  You may obtain a copy of the License at
+  *
+  *http://www.apache.org/licenses/LICENSE-2.0
+  *
+  * Unless required by applicable law or agreed to in writing, software
+  * distributed under the License is distributed on an "AS IS" BASIS,
+  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  * See the License for the specific language governing permissions and
+  * limitations under the License.
+  */
+
+package kafka.server
+
+import java.util.concurrent.LinkedBlockingDeque
+
+/**
+ * This queue is used to collect actions which need to be executed later. One 
use case is that ReplicaManager#appendRecords
+ * produces record changes so we need to check and complete delayed requests. 
In order to avoid conflicting locking,
+ * we add those actions to this queue and then complete them at the end of 
KafkaApis.handle() or DelayedJoin.onExpiration.
+ */
+class ActionQueue {
+  private val queue = new LinkedBlockingDeque[() => Unit]()

Review comment:
   Sure. I will file a PR to follow the pattern in #9229
   
   In order to simplify code base, I will revert the action queue in "per 
server" :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

2020-08-30 Thread GitBox


dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this 
feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, 
so I thought that the next best step would be to create a specialized set of 
APIs similar to  
[KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client).
 These changes allow for a more expressive and well-defined interface. I'm 
wondering if I should create a new KIP and branch so that the old 
implementation can be referenced without digging into commit or page history. 
Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they 
support with the brokers. I tried tying the registration to connectionId in the 
hopes that this would give a unique identifier to each running application. 
However, this will not work since the connectionId can change while a client is 
active. Similarly, tying registration to ip:port will not work because a client 
talks to different brokers on different ports. Would it be safe to assume that 
clients with the same ip address are all the same version? Do you have any 
suggestions for what identifier config registration should be tied to if this 
assumption cannot be made?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

2020-08-30 Thread GitBox


dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this 
feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, 
so I thought that the next best step would be to create a specialized set of 
APIs similar to  
[KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client).
 These changes allow for a more expressive and well-defined interface. I'm 
wondering if I should create a new KIP and branch so that the old 
implementation can be referenced without digging into commit or page history. 
Should I just update the current kip? 
   
   I am also working on having the clients register the configs that they 
support with the brokers. I tried tying the registration to connectionId in the 
hopes that this would give a unique identifier to each running application. 
However, this will not work since it can change while a client is active. 
Similarly, tying registration to ip:port will not work because a client talks 
to different brokers on different ports. Would it be safe to assume that 
clients with the same ip address are all the same version? Do you have any 
suggestions for what identifier config registration should be tied to if this 
assumption cannot be made?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

2020-08-30 Thread GitBox


dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this 
feature. Fitting user and client-id into the `DescribeConfigs` API was awkward, 
so I thought that the next best step would be to create a specialized set of 
APIs similar to  
[KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client).
 These changes allow for a more expressive and well-defined interface. I'm 
wondering if I should create a new KIP and branch so that the old 
implementation can be referenced without digging into commit or page history. 
Do you have a preference? 
   
   I am also working on having the clients register the configs that they 
support with the brokers. I tried tying the registration to connectionId in the 
hopes that this would give a unique identifier to each running application. 
However, this will not work since it can change while a client is active. 
Similarly, tying registration to ip:port will not work because a client talks 
to different brokers on different ports. Would it be safe to assume that 
clients with the same ip address are all the same version? Do you have any 
suggestions for what identifier config registration should be tied to if this 
assumption cannot be made?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dielhennr edited a comment on pull request #9101: KAFKA-10325: KIP-649 implementation

2020-08-30 Thread GitBox


dielhennr edited a comment on pull request #9101:
URL: https://github.com/apache/kafka/pull/9101#issuecomment-683076305


   Hey @jsancio , 
   
   I added some work in progress to this branch including new APIs for this 
feature and functionality using them. Fitting user and client-id into the 
`DescribeConfigs` API was awkward, so I thought that the next best step would 
be to create a specialized set of APIs similar to  
[KIP-546](https://cwiki.apache.org/confluence/display/KAFKA/KIP-546%3A+Add+Client+Quota+APIs+to+the+Admin+Client).
 These changes allow for a more expressive and well-defined interface for 
describing and altering client configs. I'm wondering if I should create a new 
KIP and branch so that the old implementation can be referenced without digging 
into commit or page history. Do you have a preference? 
   
   I am also working on having the clients register the configs that they 
support with the brokers. I tried tying the registration to connectionId in the 
hopes that this would give a unique identifier to each running application. 
However, this will not work since it can change while a client is active. 
Similarly, tying registration to ip:port will not work because a client talks 
to different brokers on different ports. Would it be safe to assume that 
clients with the same ip address are all the same version? Do you have any 
suggestions for what identifier config registration should be tied to if this 
assumption cannot be made?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org