[jira] [Updated] (KAFKA-8915) Unable to modify partition

2019-09-17 Thread Lee Dongjin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8915?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-8915:
---
Summary: Unable to modify partition  (was: 无法修改partition)

> Unable to modify partition
> --
>
> Key: KAFKA-8915
> URL: https://issues.apache.org/jira/browse/KAFKA-8915
> Project: Kafka
>  Issue Type: Bug
>Reporter: lingyi.zhong
>Priority: Major
>
> [root@work1 kafka]# bin/kafka-topics.sh --create --zookeeper 10.20.30.77:2181 
> --replication-factor 1 --partitions 1  --topic test_topic3[root@work1 kafka]# 
> bin/kafka-topics.sh --create --zookeeper 10.20.30.77:2181 
> --replication-factor 1 --partitions 1  --topic test_topic3
> WARNING: Due to limitations in metric names, topics with a period ('.') or 
> underscore ('_') could collide. To avoid issues it is best to use either, but 
> not both.Created topic "test_topic3".[root@work1 kafka]# bin/kafka-topics.sh  
> --alter --zookeeper 10.20.30.78:2181/chroot  --partition 2 --topic test_topic3
> Exception in thread "main" joptsimple.UnrecognizedOptionException: partition 
> is not a recognized option at 
> joptsimple.OptionException.unrecognizedOption(OptionException.java:108) at 
> joptsimple.OptionParser.handleLongOptionToken(OptionParser.java:510) at 
> joptsimple.OptionParserState$2.handleArgument(OptionParserState.java:56) at 
> joptsimple.OptionParser.parse(OptionParser.java:396) at 
> kafka.admin.TopicCommand$TopicCommandOptions.(TopicCommand.scala:358) 
> at kafka.admin.TopicCommand$.main(TopicCommand.scala:44) at 
> kafka.admin.TopicCommand.main(TopicCommand.scala)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8910) Incorrect javadoc at KafkaProducer.InterceptorCallback#onCompletion

2019-09-16 Thread Lee Dongjin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16930619#comment-16930619
 ] 

Lee Dongjin edited comment on KAFKA-8910 at 9/16/19 2:48 PM:
-

I reviewed the problem, and it seems like the title and description here are a 
little bit inaccurate; the documentation is wrong on 
KafkaProducer.InterceptorCallback, not 
org.apache.kafka.clients.producer.Callback. (i.e., reversed.)


was (Author: dongjin.lee.kr):
I reviewed the problem, and it seems like the title and description here are a 
little bit unaccurate; the documentation is wrong on 
KafkaProducer.InterceptorCallback, not 
org.apache.kafka.clients.producer.Callback. (i.e., reversed.)

> Incorrect javadoc at KafkaProducer.InterceptorCallback#onCompletion
> ---
>
> Key: KAFKA-8910
> URL: https://issues.apache.org/jira/browse/KAFKA-8910
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sergey Ushakov
>Assignee: Lee Dongjin
>Priority: Major
>
> h1. Problem
> Javadoc for 
> org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion
>  states:
> {noformat}
> * @param metadata The metadata for the record that was sent (i.e. the 
> partition and offset). Null if an error
> *occurred. {noformat}
> In fact, metadata is never null since KAFKA-6180. See 
> org.apache.kafka.clients.producer.Callback for details.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8910) Incorrect javadoc at KafkaProducer.InterceptorCallback#onCompletion

2019-09-16 Thread Lee Dongjin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-8910:
---
Summary: Incorrect javadoc at 
KafkaProducer.InterceptorCallback#onCompletion  (was: Incorrect javadoc at 
org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion)

> Incorrect javadoc at KafkaProducer.InterceptorCallback#onCompletion
> ---
>
> Key: KAFKA-8910
> URL: https://issues.apache.org/jira/browse/KAFKA-8910
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sergey Ushakov
>Assignee: Lee Dongjin
>Priority: Major
>
> h1. Problem
> Javadoc for 
> org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion
>  states:
> {noformat}
> * @param metadata The metadata for the record that was sent (i.e. the 
> partition and offset). Null if an error
> *occurred. {noformat}
> In fact, metadata is never null since KAFKA-6180. See 
> org.apache.kafka.clients.producer.Callback for details.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8910) Incorrect javadoc at org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion

2019-09-16 Thread Lee Dongjin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-8910:
---
Description: 
h1. Problem

Javadoc for 
org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion
 states:
{noformat}
* @param metadata The metadata for the record that was sent (i.e. the partition 
and offset). Null if an error
*occurred. {noformat}
In fact, metadata is never null since KAFKA-6180. See 
org.apache.kafka.clients.producer.Callback for details.

  was:
h1. Problem

Javadoc for org.apache.kafka.clients.producer.Callback states:
{noformat}
* @param metadata The metadata for the record that was sent (i.e. the partition 
and offset). Null if an error
*occurred. {noformat}
In fact, metadata is never null since KAFKA-6180. See 
org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion
 for details.


> Incorrect javadoc at 
> org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion
> -
>
> Key: KAFKA-8910
> URL: https://issues.apache.org/jira/browse/KAFKA-8910
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sergey Ushakov
>Priority: Major
>
> h1. Problem
> Javadoc for 
> org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion
>  states:
> {noformat}
> * @param metadata The metadata for the record that was sent (i.e. the 
> partition and offset). Null if an error
> *occurred. {noformat}
> In fact, metadata is never null since KAFKA-6180. See 
> org.apache.kafka.clients.producer.Callback for details.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (KAFKA-8910) Incorrect javadoc at org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion

2019-09-16 Thread Lee Dongjin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-8910:
--

Assignee: Lee Dongjin

> Incorrect javadoc at 
> org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion
> -
>
> Key: KAFKA-8910
> URL: https://issues.apache.org/jira/browse/KAFKA-8910
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sergey Ushakov
>Assignee: Lee Dongjin
>Priority: Major
>
> h1. Problem
> Javadoc for 
> org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion
>  states:
> {noformat}
> * @param metadata The metadata for the record that was sent (i.e. the 
> partition and offset). Null if an error
> *occurred. {noformat}
> In fact, metadata is never null since KAFKA-6180. See 
> org.apache.kafka.clients.producer.Callback for details.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8910) Incorrect javadoc at org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion

2019-09-16 Thread Lee Dongjin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-8910:
---
Summary: Incorrect javadoc at 
org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion
  (was: Incorrect javadoc at org.apache.kafka.clients.producer.Callback)

> Incorrect javadoc at 
> org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion
> -
>
> Key: KAFKA-8910
> URL: https://issues.apache.org/jira/browse/KAFKA-8910
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sergey Ushakov
>Priority: Major
>
> h1. Problem
> Javadoc for org.apache.kafka.clients.producer.Callback states:
> {noformat}
> * @param metadata The metadata for the record that was sent (i.e. the 
> partition and offset). Null if an error
> *occurred. {noformat}
> In fact, metadata is never null since KAFKA-6180. See 
> org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion
>  for details.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8910) Incorrect javadoc at org.apache.kafka.clients.producer.Callback

2019-09-16 Thread Lee Dongjin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8910?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16930619#comment-16930619
 ] 

Lee Dongjin commented on KAFKA-8910:


I reviewed the problem, and it seems like the title and description here are a 
little bit unaccurate; the documentation is wrong on 
KafkaProducer.InterceptorCallback, not 
org.apache.kafka.clients.producer.Callback. (i.e., reversed.)

> Incorrect javadoc at org.apache.kafka.clients.producer.Callback
> ---
>
> Key: KAFKA-8910
> URL: https://issues.apache.org/jira/browse/KAFKA-8910
> Project: Kafka
>  Issue Type: Bug
>Reporter: Sergey Ushakov
>Priority: Major
>
> h1. Problem
> Javadoc for org.apache.kafka.clients.producer.Callback states:
> {noformat}
> * @param metadata The metadata for the record that was sent (i.e. the 
> partition and offset). Null if an error
> *occurred. {noformat}
> In fact, metadata is never null since KAFKA-6180. See 
> org.apache.kafka.clients.producer.KafkaProducer.InterceptorCallback#onCompletion
>  for details.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-7632) Allow fine-grained configuration for compression

2019-08-26 Thread Lee Dongjin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16915922#comment-16915922
 ] 

Lee Dongjin commented on KAFKA-7632:


[~ckozak] The draft of two interface types are already implemented, and I am 
working on patch for 2.1, 2.2, 2.3 respectively. Do you interested in this 
feature? Then, I can provide you the beta version of this patch. (discussion: 
https://lists.apache.org/thread.html/92f4eead04d07cf309e9f13b45505b391e6f02a6c7befd5f57a76a01@%3Cdev.kafka.apache.org%3E)

> Allow fine-grained configuration for compression
> 
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression. Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.
> Since it applies to the other compression codecs, we should add the same 
> functionalities to them.



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Commented] (KAFKA-8832) We should limit the maximum size read by a fetch request on the kafka server.

2019-08-25 Thread Lee Dongjin (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8832?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16915214#comment-16915214
 ] 

Lee Dongjin commented on KAFKA-8832:


Hi [~LordChen],

Since this issue includes public API changes, you need to file and make a 
discussion on this feature. Please refer here: 
[https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Improvement+Proposals]

> We should limit the maximum size read by a fetch request on the kafka server.
> -
>
> Key: KAFKA-8832
> URL: https://issues.apache.org/jira/browse/KAFKA-8832
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0, 2.2.1
>Reporter: ChenLin
>Priority: Major
>  Labels: needs-kip
> Attachments: image-2019-08-25-15-31-56-707.png, 
> image-2019-08-25-15-42-24-379.png
>
>
> I found that kafka is not on the server side, limiting the amount of data 
> read per fetch request. This may cause the kafka server program to report an 
> error: OutOfMemory. Due to unreasonable client configuration, 
> fetch.message.max.bytes configuration is too large, such as 100M, because the 
> kafka server receives a lot of fetch requests at a certain moment, causing 
> the server to report an error: OutOfMemory。So I think this is a bug。
> !image-2019-08-25-15-42-24-379.png!
> !image-2019-08-25-15-31-56-707.png!



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Updated] (KAFKA-8832) We should limit the maximum size read by a fetch request on the kafka server.

2019-08-25 Thread Lee Dongjin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8832?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-8832:
---
Labels: needs-kip  (was: )

> We should limit the maximum size read by a fetch request on the kafka server.
> -
>
> Key: KAFKA-8832
> URL: https://issues.apache.org/jira/browse/KAFKA-8832
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.3.0, 2.2.1
>Reporter: ChenLin
>Priority: Major
>  Labels: needs-kip
> Attachments: image-2019-08-25-15-31-56-707.png, 
> image-2019-08-25-15-42-24-379.png
>
>
> I found that kafka is not on the server side, limiting the amount of data 
> read per fetch request. This may cause the kafka server program to report an 
> error: OutOfMemory. Due to unreasonable client configuration, 
> fetch.message.max.bytes configuration is too large, such as 100M, because the 
> kafka server receives a lot of fetch requests at a certain moment, causing 
> the server to report an error: OutOfMemory。So I think this is a bug。
> !image-2019-08-25-15-42-24-379.png!
> !image-2019-08-25-15-31-56-707.png!



--
This message was sent by Atlassian Jira
(v8.3.2#803003)


[jira] [Assigned] (KAFKA-8800) Flaky Test SaslScramSslEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl

2019-08-14 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8800?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-8800:
--

Assignee: Lee Dongjin

> Flaky Test 
> SaslScramSslEndToEndAuthorizationTest#testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl
> --
>
> Key: KAFKA-8800
> URL: https://issues.apache.org/jira/browse/KAFKA-8800
> Project: Kafka
>  Issue Type: Bug
>  Components: core, security, unit tests
>Affects Versions: 2.4.0
>Reporter: Matthias J. Sax
>Assignee: Lee Dongjin
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk11-scala2.12/6956/testReport/junit/kafka.api/SaslScramSslEndToEndAuthorizationTest/testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl/]
> {quote}org.scalatest.exceptions.TestFailedException: Consumed 0 records 
> before timeout instead of the expected 1 records at 
> org.scalatest.Assertions.newAssertionFailedException(Assertions.scala:530) at 
> org.scalatest.Assertions.newAssertionFailedException$(Assertions.scala:529) 
> at 
> org.scalatest.Assertions$.newAssertionFailedException(Assertions.scala:1389) 
> at org.scalatest.Assertions.fail(Assertions.scala:1091) at 
> org.scalatest.Assertions.fail$(Assertions.scala:1087) at 
> org.scalatest.Assertions$.fail(Assertions.scala:1389) at 
> kafka.utils.TestUtils$.waitUntilTrue(TestUtils.scala:822) at 
> kafka.utils.TestUtils$.pollRecordsUntilTrue(TestUtils.scala:781) at 
> kafka.utils.TestUtils$.pollUntilAtLeastNumRecords(TestUtils.scala:1312) at 
> kafka.utils.TestUtils$.consumeRecords(TestUtils.scala:1320) at 
> kafka.api.EndToEndAuthorizationTest.consumeRecords(EndToEndAuthorizationTest.scala:522)
>  at 
> kafka.api.EndToEndAuthorizationTest.testNoDescribeProduceOrConsumeWithoutTopicDescribeAcl(EndToEndAuthorizationTest.scala:361){quote}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (KAFKA-8789) kafka-console-consumer timeout-ms setting behaves incorrectly with older client

2019-08-13 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8789?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-8789:
--

Assignee: Lee Dongjin

> kafka-console-consumer timeout-ms setting behaves incorrectly with older 
> client
> ---
>
> Key: KAFKA-8789
> URL: https://issues.apache.org/jira/browse/KAFKA-8789
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 2.3.0
>Reporter: Raman Gupta
>Assignee: Lee Dongjin
>Priority: Major
>
> I have a topic with about 20,000 events in it, running on a Kafka 2.3.0 
> broker. When I run the following tools command using the older Kafka client 
> included in Confluent 5.0.3.
> bin/kafka-console-consumer \ 
>   --bootstrap-server $KAFKA \ 
>   --topic x \ 
>   --from-beginning --max-messages 1 \
>  --timeout-ms 15000
> I get 1 message as expected.
> However, when running the exact same command using the console consumer 
> included with Confluent 5.3.0, I get 
> org.apache.kafka.common.errors.TimeoutException, and 0 messages processed.
> NOTE: I am using the Confluent distribution of Kafka for the client side 
> tools, specifically Confluent 5.0.3 and Confluent 5.3.0. I can certainly try 
> to replicate with a vanilla Kafka if necessary.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Created] (KAFKA-8794) Provide Javadoc on DescribeLogDirsResult

2019-08-13 Thread Lee Dongjin (JIRA)
Lee Dongjin created KAFKA-8794:
--

 Summary: Provide Javadoc on DescribeLogDirsResult
 Key: KAFKA-8794
 URL: https://issues.apache.org/jira/browse/KAFKA-8794
 Project: Kafka
  Issue Type: Improvement
  Components: documentation
Reporter: Lee Dongjin
Assignee: Lee Dongjin


As of 2.3.0, DescribeLogDirsResult returned by 
AdminClient#describeLogDirs(Collection) is exposing the internal data 
structure, DescribeLogDirsResponse.LogDirInfo. By doing so, its Javadoc 
provides no documentation on it. Its imparity is clear when comparing with 
DescribeReplicaLogDirsResult, returned by 
AdminClient#describeReplicaLogDirs(Collection).

To resolve this, org.apache.kafka.clients.admin.DescribeLogDirsResult should 
provide [LogDirInfo, ReplicaInfo] as its internal class, like 
DescribeReplicaLogDirsResult.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (KAFKA-8403) Suppress needs a Materialized variant

2019-08-09 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8403?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-8403:
--

Assignee: Lee Dongjin

> Suppress needs a Materialized variant
> -
>
> Key: KAFKA-8403
> URL: https://issues.apache.org/jira/browse/KAFKA-8403
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
>
> The newly added KTable Suppress operator lacks a Materialized variant, which 
> would be useful if you wanted to query the results of the suppression.
> Suppression results will eventually match the upstream results, but the 
> intermediate distinction may be meaningful for some applications. For 
> example, you could want to query only the final results of a windowed 
> aggregation.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (KAFKA-8453) AdminClient describeTopic should handle partition level errors

2019-07-15 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-8453:
--

Assignee: Lee Dongjin

> AdminClient describeTopic should handle partition level errors
> --
>
> Key: KAFKA-8453
> URL: https://issues.apache.org/jira/browse/KAFKA-8453
> Project: Kafka
>  Issue Type: Bug
>Reporter: Jason Gustafson
>Assignee: Lee Dongjin
>Priority: Major
>
> The Metadata response may contain the following partition-level error codes: 
> LEADER_NOT_AVAILABLE, REPLICA_NOT_AVAILABLE, and LISTENER_NOT_FOUND. The 
> AdminClient at the moment does not appear to be checking these error codes. 
> This seems mostly harmless in the case of LEADER_NOT_AVAILABLE and 
> REPLICA_NOT_AVAILABLE, but potentially we should raise an error in the case 
> of LISTENER_NOT_FOUND since the result would otherwise be misleading.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8507) Support --bootstrap-server in all command line tools

2019-07-14 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16884693#comment-16884693
 ] 

Lee Dongjin commented on KAFKA-8507:


[~hachikuji] [~ijuma] 

I researched this issue last weekend (following KAFKA-8610) and found it is a 
little bit more complicated than expected.

As of present, the command-line tools are located in kafka.tools package or 
kafka.admin package in the core module, with jOptSimple as a arguments parser. 
By this way, the tools provide a standardized interface like 'version' 
parameter.

However, there are some inconsistencies:

1. The following tools are located in org.apache.kafka.tools package the tools 
module, which holds integration test tools:

    1. ProducerPerformance
     2. VerifiableProducer
     3. VerifiableConsumer

Above tools are also using argparse4j as a parser. As a result, they do not 
provide standardized 'version' parameter. (see KAFKA-8292) They are also using 
'producer-props' or 'broker-list', not 'bootstrap-server'.

2. The following tools are not using 'bootstrap-server'. Asterisk means there 
is a PR about this issue.

    1. GetOffsetShell: using 'broker-list'
     2. ReplicaVerificationTool: using 'broker-list'
     3. ConsumerPerformance: using 'broker-list'
     4. ConsoleProducer: using 'broker-list'
     5. StreamResetter: using 'bootstrap-servers' [*]
     6. VerifiableLog4jAppender: using 'broker-list'
     7. TransactionalMessageCopier: using 'broker-list'
     8. ReassignPartitionsCommand: using 'broker-list' [*]
     9. LogDirsCommand: using 'broker-list'

3. There are some other problems:

    1. ProducerPerformance provides producer-props, but ConsumerPerformance 
doesn't have consumer-props. (see KAFKA-8647) Since jOptSimple does not support 
arguments like '--producer-props acks=1 bootstrap.servers=localhost:9092 
buffer.memory=67108864', producer-props should be deprecated and removed - not 
adding consumer-props.
     2. ConfigCommand supports both of Zookeeper-based processing and 
broker-based processing (with zookeeper, bootstrap-server parameters) but 
'zookeeper' is a required parameter now; The user must specify either of 
'zookeeper' or 'bootstrap-server' only.
     3. jOptSimple API is not used properly. For example, jOptSimple provides a 
way to specify required parameters 
([ArgumentAcceptingOptionSpec#required|http://jopt-simple.github.io/jopt-simple/apidocs/joptsimple/ArgumentAcceptingOptionSpec.html#required--])
 which throws exception on parsing. But currently, 
CommandLineUtils#checkRequiredArgs is used instead.
     4. Trivial: in scala, it is recommended to call a method without arguments 
without parenthesis. However, there are several withRequiredArg() or 
withOptionalArg().

IMHO, how about this approach?

A. Create an umbrella issue with the described above, which handles all 
command-line tools inconsistencies. (Of course, it requires a KIP.)
 B. Make the issues above into the subtask of the umbrella issue.

If it is okay, I hope to take this task.

Note: You can check [the draft 
implementation|https://github.com/dongjinleekr/kafka/tree/feature/KAFKA-8292] 
which handles package and parsing inconsistency described in 1. 
[#1|https://github.com/dongjinleekr/kafka/blob/feature%2FKAFKA-8292/core/src/main/scala/kafka/tools/ProducerPerformance.scala]
 
[#2|https://github.com/dongjinleekr/kafka/blob/feature%2FKAFKA-8292/core/src/test/scala/kafka/tools/ProducerPerformanceTest.scala]
 Note 2: Keeping 'ProducerConfig.BOOTSTRAP_SERVERS_CONFIG' in mind, 
'bootstrap-servers' may be better for consistency. How do you think?

> Support --bootstrap-server in all command line tools
> 
>
> Key: KAFKA-8507
> URL: https://issues.apache.org/jira/browse/KAFKA-8507
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: needs-kip
>
> This is a unambitious initial move toward standardizing the command line 
> tools. We have favored the name {{\-\-bootstrap-server}} in all new tools 
> since it matches the config {{bootstrap.server}} which is used by all 
> clients. Some older commands use {{\-\-broker-list}} or 
> {{\-\-bootstrap-servers}} and maybe other exotic variations. We should 
> support {{\-\-bootstrap-server}} in all commands and deprecate the other 
> options.



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Assigned] (KAFKA-8482) alterReplicaLogDirs should be better documented

2019-07-12 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8482?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-8482:
--

Assignee: Lee Dongjin

> alterReplicaLogDirs should be better documented
> ---
>
> Key: KAFKA-8482
> URL: https://issues.apache.org/jira/browse/KAFKA-8482
> Project: Kafka
>  Issue Type: Improvement
>  Components: admin
>Reporter: Colin P. McCabe
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: newbie
> Fix For: 2.4.0
>
>
> alterReplicaLogDirs should be better documented.  In particular, it should 
> document what exceptions it throws in {{AdminClient.java}}



--
This message was sent by Atlassian JIRA
(v7.6.14#76016)


[jira] [Commented] (KAFKA-8610) Don't use /bin/bash in scripts

2019-07-10 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8610?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16881834#comment-16881834
 ] 

Lee Dongjin commented on KAFKA-8610:


Hi [~llamahunter],

Here is the draft fix. Please have a test and give some feedback.

> Don't use /bin/bash in scripts
> --
>
> Key: KAFKA-8610
> URL: https://issues.apache.org/jira/browse/KAFKA-8610
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Richard Lee
>Assignee: Lee Dongjin
>Priority: Minor
>
> On small container installations (such as alpine), /bin/bash is not 
> installed. It appears the scripts in the /bin directory would mostly work 
> with /bin/sh. Please use a simpler shell for shell scripts so that they are 
> more portable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8610) Don't use /bin/bash in scripts

2019-07-10 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8610?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-8610:
--

Assignee: Lee Dongjin

> Don't use /bin/bash in scripts
> --
>
> Key: KAFKA-8610
> URL: https://issues.apache.org/jira/browse/KAFKA-8610
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Richard Lee
>Assignee: Lee Dongjin
>Priority: Minor
>
> On small container installations (such as alpine), /bin/bash is not 
> installed. It appears the scripts in the /bin directory would mostly work 
> with /bin/sh. Please use a simpler shell for shell scripts so that they are 
> more portable.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7869) Refactor RocksDBConfigSetter API to separate DBOptions and CFOptions

2019-07-01 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7869?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16876071#comment-16876071
 ] 

Lee Dongjin commented on KAFKA-7869:


[~manish.c.ghildi...@gmail.com] As you can see in the linked PR, I am already 
working on this issue. Could you give some feedback on the interface or usage 
case? It would be a great help to elaborating the interface.

> Refactor RocksDBConfigSetter API to separate DBOptions and CFOptions
> 
>
> Key: KAFKA-7869
> URL: https://issues.apache.org/jira/browse/KAFKA-7869
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: needs-kip, newbie
>
> Current RocksDBConfigSetter has the following API:
> {code}
> void setConfig(final String storeName, final Options options, final 
> Map configs);
> {code}
> Where `Options` contains configurations for both db-level and cf-level of 
> RocksDB.
> As we move on to have multiple CFs following KIP-258, it's better to refactor 
> it into
> {code}
> void setConfig(final String storeName, final DBOptions dbOptions, final 
> ColumnFamilyOptions cfOptions, final Map configs);
> {code}
> And then inside the internal implementation, if only the default CF is used, 
> we can still use the other constructor of `Options` that takes both a 
> DBOptions and CFOptions object as parameters.
> This should be started only after KIP-258 is finished.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-8611) Make topic optional when using through() operations in DSL

2019-06-28 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8611?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-8611:
---
Labels: needs-kip  (was: )

> Make topic optional when using through() operations in DSL
> --
>
> Key: KAFKA-8611
> URL: https://issues.apache.org/jira/browse/KAFKA-8611
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Levani Kokhreidze
>Priority: Minor
>  Labels: needs-kip
>
> When using DSL in Kafka Streams, data re-partition happens only when 
> key-changing operation is followed by stateful operation. On the other hand, 
> in DSL, stateful computation can happen using _transform()_ operation as 
> well. Problem with this approach is that, even if any upstream operation was 
> key-changing before calling _transform()_, no auto-repartition is triggered. 
> If repartitioning is required, a call to _through(String)_ should be 
> performed before _transform()_. With the current implementation, burden of 
> managing and creating the topic falls on user and introduces extra complexity 
> of managing Kafka Streams application.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8546) Fix needed to avoid memory leak caused by JDK-6293787

2019-06-28 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16874713#comment-16874713
 ] 

Lee Dongjin commented on KAFKA-8546:


[~badai] Absolutely. I agree with that approach. Right, the current 
implementation is so crude. Let's continue discussing on the PR.

> Fix needed to avoid memory leak caused by JDK-6293787
> -
>
> Key: KAFKA-8546
> URL: https://issues.apache.org/jira/browse/KAFKA-8546
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.0.1
>Reporter: Badai Aqrandista
>Assignee: Lee Dongjin
>Priority: Minor
> Attachments: KAFKA-8546.patch, Screen Shot 2019-05-30 at 1.27.25 
> pm.png
>
>
> When a heavily used broker uses gzip compression on all topics, sometime you 
> can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This 
> is caused by memory leak caused by JDK-6293787 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is 
> caused by JDK-4797189 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]).
>  
> In summary, this is what happen:
>  * Inflater class contains finalizer method.
>  * Whenever a class with finalizer method is instantiated, a Finalizer object 
> is created. 
>  * GC finalizer thread is responsible to process all Finalizer objects.
>  * If the rate of Finalizer object creation exceed the rate of GC finalizer 
> thread ability to process it, Finalizer object number grows continuously, and 
> eventually triggers full GC (because it is stored in Old Gen).
>  
> Following stack trace shows what happen when a process is frozen doing full 
> GC:
>  
> {code:java}
> kafka-request-handler-13  Runnable Thread ID: 79
>   java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java
>   java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259
>   java.util.zip.InflaterInputStream.read(byte[], int, int) 
> InflaterInputStream.java:152
>   java.util.zip.GZIPInputStream.read(byte[], int, int) 
> GZIPInputStream.java:117
>   java.io.BufferedInputStream.fill() BufferedInputStream.java:246
>   java.io.BufferedInputStream.read() BufferedInputStream.java:265
>   java.io.DataInputStream.readByte() DataInputStream.java:265
>   org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) 
> ByteUtils.java:168
>   org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, 
> long, int, Long) DefaultRecord.java:292
>   org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, 
> int, Long) DefaultRecordBatch.java:264
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:563
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:532
>   org.apache.kafka.common.record.DefaultRecordBatch.iterator() 
> DefaultRecordBatch.java:327
>   scala.collection.convert.Wrappers$JIterableWrapper.iterator() 
> Wrappers.scala:54
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>  scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch)
>  LogValidator.scala:267
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object)
>  LogValidator.scala:259
>   scala.collection.Iterator$class.foreach(Iterator, Function1) 
> Iterator.scala:891
>   scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>   scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords,
>  LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:259
>   kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, 
> LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:70
>   kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, 
> LongRef, long) Log.scala:771
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:770
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:752
>   kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842
>   kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752
>   kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722
>   kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:660
>   kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:648
>   kafka.utils.CoreUtils$.inLock(Lock, Function0) 

[jira] [Assigned] (KAFKA-8546) Call System#runFinalization to avoid memory leak caused by JDK-6293787

2019-06-20 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-8546:
--

Assignee: Lee Dongjin

> Call System#runFinalization to avoid memory leak caused by JDK-6293787
> --
>
> Key: KAFKA-8546
> URL: https://issues.apache.org/jira/browse/KAFKA-8546
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.0.1
>Reporter: Badai Aqrandista
>Assignee: Lee Dongjin
>Priority: Minor
> Attachments: KAFKA-8546.patch, Screen Shot 2019-05-30 at 1.27.25 
> pm.png
>
>
> When a heavily used broker uses gzip compression on all topics, sometime you 
> can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This 
> is caused by memory leak caused by JDK-6293787 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is 
> caused by JDK-4797189 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]).
>  
> In summary, this is what happen:
>  * Inflater class contains finalizer method.
>  * Whenever a class with finalizer method is instantiated, a Finalizer object 
> is created. 
>  * GC finalizer thread is responsible to process all Finalizer objects.
>  * If the rate of Finalizer object creation exceed the rate of GC finalizer 
> thread ability to process it, Finalizer object number grows continuously, and 
> eventually triggers full GC (because it is stored in Old Gen).
>  
> Following stack trace shows what happen when a process is frozen doing full 
> GC:
>  
> {code:java}
> kafka-request-handler-13  Runnable Thread ID: 79
>   java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java
>   java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259
>   java.util.zip.InflaterInputStream.read(byte[], int, int) 
> InflaterInputStream.java:152
>   java.util.zip.GZIPInputStream.read(byte[], int, int) 
> GZIPInputStream.java:117
>   java.io.BufferedInputStream.fill() BufferedInputStream.java:246
>   java.io.BufferedInputStream.read() BufferedInputStream.java:265
>   java.io.DataInputStream.readByte() DataInputStream.java:265
>   org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) 
> ByteUtils.java:168
>   org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, 
> long, int, Long) DefaultRecord.java:292
>   org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, 
> int, Long) DefaultRecordBatch.java:264
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:563
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:532
>   org.apache.kafka.common.record.DefaultRecordBatch.iterator() 
> DefaultRecordBatch.java:327
>   scala.collection.convert.Wrappers$JIterableWrapper.iterator() 
> Wrappers.scala:54
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>  scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch)
>  LogValidator.scala:267
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object)
>  LogValidator.scala:259
>   scala.collection.Iterator$class.foreach(Iterator, Function1) 
> Iterator.scala:891
>   scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>   scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords,
>  LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:259
>   kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, 
> LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:70
>   kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, 
> LongRef, long) Log.scala:771
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:770
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:752
>   kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842
>   kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752
>   kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722
>   kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:660
>   kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:648
>   kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251
>   kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) 
> CoreUtils.scala:257
>   

[jira] [Commented] (KAFKA-8546) Call System#runFinalization to avoid memory leak caused by JDK-6293787

2019-06-20 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16868844#comment-16868844
 ] 

Lee Dongjin commented on KAFKA-8546:


Hi [~badai],

Here is the patch: [^KAFKA-8546.patch] I confirmed that it cleanly applies onto 
2.0.1 and passes all the tests. In short, it calls {{System#runFinalization}} 
on closing GZip[Input, Output]Stream to avoid the surge of [Inflater, Deflater] 
objects. Could you test this patch on your system?

> Call System#runFinalization to avoid memory leak caused by JDK-6293787
> --
>
> Key: KAFKA-8546
> URL: https://issues.apache.org/jira/browse/KAFKA-8546
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.0.1
>Reporter: Badai Aqrandista
>Priority: Minor
> Attachments: KAFKA-8546.patch, Screen Shot 2019-05-30 at 1.27.25 
> pm.png
>
>
> When a heavily used broker uses gzip compression on all topics, sometime you 
> can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This 
> is caused by memory leak caused by JDK-6293787 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is 
> caused by JDK-4797189 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]).
>  
> In summary, this is what happen:
>  * Inflater class contains finalizer method.
>  * Whenever a class with finalizer method is instantiated, a Finalizer object 
> is created. 
>  * GC finalizer thread is responsible to process all Finalizer objects.
>  * If the rate of Finalizer object creation exceed the rate of GC finalizer 
> thread ability to process it, Finalizer object number grows continuously, and 
> eventually triggers full GC (because it is stored in Old Gen).
>  
> Following stack trace shows what happen when a process is frozen doing full 
> GC:
>  
> {code:java}
> kafka-request-handler-13  Runnable Thread ID: 79
>   java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java
>   java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259
>   java.util.zip.InflaterInputStream.read(byte[], int, int) 
> InflaterInputStream.java:152
>   java.util.zip.GZIPInputStream.read(byte[], int, int) 
> GZIPInputStream.java:117
>   java.io.BufferedInputStream.fill() BufferedInputStream.java:246
>   java.io.BufferedInputStream.read() BufferedInputStream.java:265
>   java.io.DataInputStream.readByte() DataInputStream.java:265
>   org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) 
> ByteUtils.java:168
>   org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, 
> long, int, Long) DefaultRecord.java:292
>   org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, 
> int, Long) DefaultRecordBatch.java:264
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:563
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:532
>   org.apache.kafka.common.record.DefaultRecordBatch.iterator() 
> DefaultRecordBatch.java:327
>   scala.collection.convert.Wrappers$JIterableWrapper.iterator() 
> Wrappers.scala:54
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>  scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch)
>  LogValidator.scala:267
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object)
>  LogValidator.scala:259
>   scala.collection.Iterator$class.foreach(Iterator, Function1) 
> Iterator.scala:891
>   scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>   scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords,
>  LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:259
>   kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, 
> LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:70
>   kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, 
> LongRef, long) Log.scala:771
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:770
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:752
>   kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842
>   kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752
>   kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722
>   

[jira] [Updated] (KAFKA-8546) Call System#runFinalization to avoid memory leak caused by JDK-6293787

2019-06-20 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8546?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-8546:
---
Attachment: KAFKA-8546.patch

> Call System#runFinalization to avoid memory leak caused by JDK-6293787
> --
>
> Key: KAFKA-8546
> URL: https://issues.apache.org/jira/browse/KAFKA-8546
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.0.1
>Reporter: Badai Aqrandista
>Priority: Minor
> Attachments: KAFKA-8546.patch, Screen Shot 2019-05-30 at 1.27.25 
> pm.png
>
>
> When a heavily used broker uses gzip compression on all topics, sometime you 
> can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This 
> is caused by memory leak caused by JDK-6293787 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is 
> caused by JDK-4797189 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]).
>  
> In summary, this is what happen:
>  * Inflater class contains finalizer method.
>  * Whenever a class with finalizer method is instantiated, a Finalizer object 
> is created. 
>  * GC finalizer thread is responsible to process all Finalizer objects.
>  * If the rate of Finalizer object creation exceed the rate of GC finalizer 
> thread ability to process it, Finalizer object number grows continuously, and 
> eventually triggers full GC (because it is stored in Old Gen).
>  
> Following stack trace shows what happen when a process is frozen doing full 
> GC:
>  
> {code:java}
> kafka-request-handler-13  Runnable Thread ID: 79
>   java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java
>   java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259
>   java.util.zip.InflaterInputStream.read(byte[], int, int) 
> InflaterInputStream.java:152
>   java.util.zip.GZIPInputStream.read(byte[], int, int) 
> GZIPInputStream.java:117
>   java.io.BufferedInputStream.fill() BufferedInputStream.java:246
>   java.io.BufferedInputStream.read() BufferedInputStream.java:265
>   java.io.DataInputStream.readByte() DataInputStream.java:265
>   org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) 
> ByteUtils.java:168
>   org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, 
> long, int, Long) DefaultRecord.java:292
>   org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, 
> int, Long) DefaultRecordBatch.java:264
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:563
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:532
>   org.apache.kafka.common.record.DefaultRecordBatch.iterator() 
> DefaultRecordBatch.java:327
>   scala.collection.convert.Wrappers$JIterableWrapper.iterator() 
> Wrappers.scala:54
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>  scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch)
>  LogValidator.scala:267
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object)
>  LogValidator.scala:259
>   scala.collection.Iterator$class.foreach(Iterator, Function1) 
> Iterator.scala:891
>   scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>   scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords,
>  LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:259
>   kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, 
> LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:70
>   kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, ObjectRef, 
> LongRef, long) Log.scala:771
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:770
>   kafka.log.Log$$anonfun$append$2.apply() Log.scala:752
>   kafka.log.Log.maybeHandleIOException(Function0, Function0) Log.scala:1842
>   kafka.log.Log.append(MemoryRecords, boolean, boolean, int) Log.scala:752
>   kafka.log.Log.appendAsLeader(MemoryRecords, int, boolean) Log.scala:722
>   kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:660
>   kafka.cluster.Partition$$anonfun$13.apply() Partition.scala:648
>   kafka.utils.CoreUtils$.inLock(Lock, Function0) CoreUtils.scala:251
>   kafka.utils.CoreUtils$.inReadLock(ReadWriteLock, Function0) 
> CoreUtils.scala:257
>   

[jira] [Commented] (KAFKA-8546) Call System#runFinalization to avoid memory leak caused by JDK-6293787

2019-06-17 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8546?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16865734#comment-16865734
 ] 

Lee Dongjin commented on KAFKA-8546:


[~ijuma] I inspected this issue a little bit. Although [~badai] described the 
case of `GZIPInputStream` (which uses `Inflater` internally), 
`GZIPOutputStream` (which uses `Deflater` internally) also has the same 
vulnerability - `Deflater` also has `finalize` method like `Inflater`.

One possible strategy would be adding an internal implementation for Gzip 
Streams (like `KafkaLZ4Block[Input,Output]Stream`), which calls 
`System#runFinalization` on closing. As a note, I had implemented a similar 
class when I was working on KAFKA-7632 (in this case, to support GZIP 
compression level.)

How do you think? If the committers agree, I hope to take this issue.

[~badai] Thanks Badai, your description is so comprehensive and helpful.

> Call System#runFinalization to avoid memory leak caused by JDK-6293787
> --
>
> Key: KAFKA-8546
> URL: https://issues.apache.org/jira/browse/KAFKA-8546
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.0.1
>Reporter: Badai Aqrandista
>Priority: Minor
> Attachments: Screen Shot 2019-05-30 at 1.27.25 pm.png
>
>
> When a heavily used broker uses gzip compression on all topics, sometime you 
> can hit GC pauses greater than zookeeper.session.timeout.ms of 6000ms. This 
> is caused by memory leak caused by JDK-6293787 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=6293787]), which is 
> caused by JDK-4797189 
> ([https://bugs.java.com/bugdatabase/view_bug.do?bug_id=4797189]).
>  
> In summary, this is what happen:
>  * Inflater class contains finalizer method.
>  * Whenever a class with finalizer method is instantiated, a Finalizer object 
> is created. 
>  * GC finalizer thread is responsible to process all Finalizer objects.
>  * If the rate of Finalizer object creation exceed the rate of GC finalizer 
> thread ability to process it, Finalizer object number grows continuously, and 
> eventually triggers full GC (because it is stored in Old Gen).
>  
> Following stack trace shows what happen when a process is frozen doing full 
> GC:
>  
> {code:java}
> kafka-request-handler-13  Runnable Thread ID: 79
>   java.util.zip.Inflater.inflateBytes(long, byte[], int, int) Inflater.java
>   java.util.zip.Inflater.inflate(byte[], int, int) Inflater.java:259
>   java.util.zip.InflaterInputStream.read(byte[], int, int) 
> InflaterInputStream.java:152
>   java.util.zip.GZIPInputStream.read(byte[], int, int) 
> GZIPInputStream.java:117
>   java.io.BufferedInputStream.fill() BufferedInputStream.java:246
>   java.io.BufferedInputStream.read() BufferedInputStream.java:265
>   java.io.DataInputStream.readByte() DataInputStream.java:265
>   org.apache.kafka.common.utils.ByteUtils.readVarint(DataInput) 
> ByteUtils.java:168
>   org.apache.kafka.common.record.DefaultRecord.readFrom(DataInput, long, 
> long, int, Long) DefaultRecord.java:292
>   org.apache.kafka.common.record.DefaultRecordBatch$1.readNext(long, long, 
> int, Long) DefaultRecordBatch.java:264
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:563
>   org.apache.kafka.common.record.DefaultRecordBatch$RecordIterator.next() 
> DefaultRecordBatch.java:532
>   org.apache.kafka.common.record.DefaultRecordBatch.iterator() 
> DefaultRecordBatch.java:327
>   scala.collection.convert.Wrappers$JIterableWrapper.iterator() 
> Wrappers.scala:54
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>  scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(MutableRecordBatch)
>  LogValidator.scala:267
>   
> kafka.log.LogValidator$$anonfun$validateMessagesAndAssignOffsetsCompressed$1.apply(Object)
>  LogValidator.scala:259
>   scala.collection.Iterator$class.foreach(Iterator, Function1) 
> Iterator.scala:891
>   scala.collection.AbstractIterator.foreach(Function1) Iterator.scala:1334
>   scala.collection.IterableLike$class.foreach(IterableLike, Function1) 
> IterableLike.scala:72
>   scala.collection.AbstractIterable.foreach(Function1) Iterable.scala:54
>   
> kafka.log.LogValidator$.validateMessagesAndAssignOffsetsCompressed(MemoryRecords,
>  LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:259
>   kafka.log.LogValidator$.validateMessagesAndAssignOffsets(MemoryRecords, 
> LongRef, Time, long, CompressionCodec, CompressionCodec, boolean, byte, 
> TimestampType, long, int, boolean) LogValidator.scala:70
>   kafka.log.Log$$anonfun$append$2.liftedTree1$1(LogAppendInfo, 

[jira] [Commented] (KAFKA-8507) Support --bootstrap-server in all command line tools

2019-06-11 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8507?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16860611#comment-16860611
 ] 

Lee Dongjin commented on KAFKA-8507:


I reviewed the tools in `bin` and found that the out-of-date tools are fixed in 
the PRs following:

1. https://github.com/apache/kafka/pull/3453 (KAFKA-5532)
 * ProducerPerformance

2. https://github.com/apache/kafka/pull/2161 (KAFKA-4307)
 * VerifiableConsumer
 * VerifiableProducer

3. https://github.com/apache/kafka/pull/3605 (KAFKA-2111)
 * ConsumerGroupCommand
 * ReassignPartitionsCommand
 * ConsoleConsumer
 * StreamResetter

Are there any tools I omitted?

> Support --bootstrap-server in all command line tools
> 
>
> Key: KAFKA-8507
> URL: https://issues.apache.org/jira/browse/KAFKA-8507
> Project: Kafka
>  Issue Type: Improvement
>  Components: tools
>Reporter: Jason Gustafson
>Priority: Major
>  Labels: needs-kip
>
> This is a unambitious initial move toward standardizing the command line 
> tools. We have favored the name {{\-\-bootstrap-server}} in all new tools 
> since it matches the config {{bootstrap.server}} which is used by all 
> clients. Some older commands use {{\-\-broker-list}} or 
> {{\-\-bootstrap-servers}} and maybe other exotic variations. We should 
> support {{\-\-bootstrap-server}} in all commands and deprecate the other 
> options.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8316) Remove deprecated usage of Slf4jRequestLog, SslContextFactory

2019-05-03 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8316?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-8316:
--

Assignee: Lee Dongjin

> Remove deprecated usage of Slf4jRequestLog, SslContextFactory
> -
>
> Key: KAFKA-8316
> URL: https://issues.apache.org/jira/browse/KAFKA-8316
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: newbie
>
> Jetty recently deprecated a few classes we use. The following commit 
> suppresses the deprecation warnings:
> https://github.com/apache/kafka/commit/e66bc6255b2ee42481b54b7fd1d256b9e4ff5741
> We should remove the suppressions and use the suggested alternatives.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-8308) Update jetty for security vulnerability CVE-2019-10241

2019-05-03 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8308?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-8308:
--

Assignee: Lee Dongjin

> Update jetty for security vulnerability CVE-2019-10241
> --
>
> Key: KAFKA-8308
> URL: https://issues.apache.org/jira/browse/KAFKA-8308
> Project: Kafka
>  Issue Type: Task
>  Components: core
>Affects Versions: 2.2.0
>Reporter: Di Shang
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: security
>
> Kafka 2.2 uses jetty-*-9.4.14.v20181114 which is marked vulnerable
> [https://github.com/apache/kafka/blob/2.2/gradle/dependencies.gradle#L58]
>  
> [https://nvd.nist.gov/vuln/detail/CVE-2019-10241]



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer

2019-04-23 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16824698#comment-16824698
 ] 

Lee Dongjin commented on KAFKA-7996:


[~bbejeck] [~mjsax] Sorry for the late reply. I just submitted [the 
KIP|https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close]
 with keeping the KafkaStreams#close's timeout parameter in mind. Let' continue 
the discussion in the mailing thread. Please have a look when you are free.

> KafkaStreams does not pass timeout when closing Producer
> 
>
> Key: KAFKA-7996
> URL: https://issues.apache.org/jira/browse/KAFKA-7996
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Patrik Kleindl
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
>
> [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/]
> We are running 2.1 and have a case where the shutdown of a streams 
> application takes several minutes
> I noticed that although we call streams.close with a timeout of 30 seconds 
> the log says
> [Producer 
> clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] 
> Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> Matthias J Sax [vor 3 Tagen]
> I just checked the code, and yes, we don't provide a timeout for the producer 
> on close()...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7502) Cleanup KTable materialization logic in a single place

2019-03-31 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin resolved KAFKA-7502.

Resolution: Fixed

> Cleanup KTable materialization logic in a single place
> --
>
> Key: KAFKA-7502
> URL: https://issues.apache.org/jira/browse/KAFKA-7502
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Lee Dongjin
>Priority: Major
>
> Today since we pre-create all the `KTableXXX` operator along with the logical 
> node, we are effectively duplicating the logic to determine whether the 
> resulted KTable should be materialized. More specifically, the 
> materialization principle today is that:
> 1) If users specified Materialized in the DSL and it contains a queryable 
> name. We always materialize.
> 2) If users specified Materialized in the DSL but not contains a queryable 
> name, or if users do not specify a Materialized object at all, Streams may 
> choose to materialize or not. But in any cases, even if the KTable is 
> materialized it will not be queryable since there's no queryable name (i.e. 
> only storeName is not null, but queryableName is null):
> 2.a) If the resulted KTable is from an aggregation, we always materialize 
> since it is needed for storing the aggregation (i.e. we use the 
> MaterializedInternal constructor with nameProvider != null).
> 2.b) If the resulted KTable is from a source topic, we delay the 
> materialization until the downstream operator requires this KTable to be 
> materialized or send-old-values (see `KTableSourceNode` and `KTableSource`).
> 2.c) If the resulted KTable if from a join, we always materialize if users 
> creates a Materialized object even without a queryable name. However this can 
> be optimized similar to 2.b) but is orthogonal to this ticket (see 
> `KTableImpl#buildJoin` where we always use constructor with nameProvider != 
> null).
> 2.d) If the resulted KTable is from a stateless operation like filter / 
> mapValues, we never materialize.
> 
> Now, in all of these cases, we have logical node like "KTableKTableJoinNode", 
> as well as physical node like `ProcessorNode`. Ideally we should always 
> create the logical Plan (i.e. the StreamsGraph), and then optimize it if 
> necessary, and then generate the physical plan (i.e. the Topology), however 
> today we create some physical nodes beforehand, and the above logic is hence 
> duplicated in the creation of both physical nodes and logical nodes. For 
> example, in `KTableKTableJoinNode` we check if Materialized is null for 
> adding a state store, and in `KTableImpl#doJoin` we check if materialized is 
> specified (case 2.c) above). 
> Another example is in TableProcessorNode which is used for 2.d) above, in 
> which it includes the logic whereas its caller, `KTableImpl#doFilter` for 
> example, also contains the logic when deciding to pass `queryableName` 
> parameter to `KTableProcessorSupplier`.
> This is bug-vulnerable since we may update the logic in one class but forgot 
> to update the other class.
> --
> What we want to have is a cleaner code path similar to what we have for 2.b), 
> such that when creating the logical nodes we keep track of whether 1) 
> materialized is specified, and 2) queryable name is provided. And during 
> optimization phase, we may change the inner physical ProcessorBuilder's 
> parameters like queryable name etc, and then when it is time to generate the 
> physical node, we can just blindly take the parameters and go for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer

2019-03-21 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16798210#comment-16798210
 ] 

Lee Dongjin commented on KAFKA-7996:


[~bbejeck] Great. I will prepare the KIP. Thanks for your feedback!

> KafkaStreams does not pass timeout when closing Producer
> 
>
> Key: KAFKA-7996
> URL: https://issues.apache.org/jira/browse/KAFKA-7996
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Patrik Kleindl
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
>
> [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/]
> We are running 2.1 and have a case where the shutdown of a streams 
> application takes several minutes
> I noticed that although we call streams.close with a timeout of 30 seconds 
> the log says
> [Producer 
> clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] 
> Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> Matthias J Sax [vor 3 Tagen]
> I just checked the code, and yes, we don't provide a timeout for the producer 
> on close()...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer

2019-03-11 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16789769#comment-16789769
 ] 

Lee Dongjin commented on KAFKA-7996:


I double checked this issue, and found the following:

1. All of `Producer`, `Consumer`, and `AdminClient` has 
`CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG` which controls network call 
timeout. However, `Consumer` has additional 
`ConsumerConfig#DEFAULT_API_TIMEOUT_MS_CONFIG` and uses it as a request timeout 
instead of `CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG.`
2. When closing `Consumer` without any timeout duration, it closes with a 
timeout of `Consumer#DEFAULT_CLOSE_TIMEOUT_MS.` But in the case of `Producer` 
and `AdminClient,` there is no similar one - `Producer#close`'s default timeout 
is `Duration.ofMillis(Long.MAX_VALUE).`

As far as I understand, it seems like there is no overall approach to handle 
close timeouts - and if it is true, we have three choices:

1. Add [Producer, AdminClient]#DEFAULT_CLOSE_TIMEOUT_MS and close with this 
default value. This approach doesn't require a KIP.
2. Use `ConsumerConfig#DEFAULT_API_TIMEOUT_MS_CONFIG` as a close timeout for 
all of `Producer`, `Consumer`, and `AdminClient`. This approach also doesn't 
require a KIP.
3. Provide additional timeout options for closing [Producer, Consumer, 
AdminClient] in `KafkaStreams` like the draft PR. This approach provides users 
a way to control the behavior, but it is an API change so requires a KIP.

How do you think? cc/ [~mjsax] [~hachikuji] [~Yohan123] [~guozhang] [~bbejeck]

> KafkaStreams does not pass timeout when closing Producer
> 
>
> Key: KAFKA-7996
> URL: https://issues.apache.org/jira/browse/KAFKA-7996
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Patrik Kleindl
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
>
> [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/]
> We are running 2.1 and have a case where the shutdown of a streams 
> application takes several minutes
> I noticed that although we call streams.close with a timeout of 30 seconds 
> the log says
> [Producer 
> clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] 
> Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> Matthias J Sax [vor 3 Tagen]
> I just checked the code, and yes, we don't provide a timeout for the producer 
> on close()...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7502) Cleanup KTable materialization logic in a single place

2019-03-05 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7502?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7502:
---
Priority: Major  (was: Minor)

> Cleanup KTable materialization logic in a single place
> --
>
> Key: KAFKA-7502
> URL: https://issues.apache.org/jira/browse/KAFKA-7502
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Lee Dongjin
>Priority: Major
>
> Today since we pre-create all the `KTableXXX` operator along with the logical 
> node, we are effectively duplicating the logic to determine whether the 
> resulted KTable should be materialized. More specifically, the 
> materialization principle today is that:
> 1) If users specified Materialized in the DSL and it contains a queryable 
> name. We always materialize.
> 2) If users specified Materialized in the DSL but not contains a queryable 
> name, or if users do not specify a Materialized object at all, Streams may 
> choose to materialize or not. But in any cases, even if the KTable is 
> materialized it will not be queryable since there's no queryable name (i.e. 
> only storeName is not null, but queryableName is null):
> 2.a) If the resulted KTable is from an aggregation, we always materialize 
> since it is needed for storing the aggregation (i.e. we use the 
> MaterializedInternal constructor with nameProvider != null).
> 2.b) If the resulted KTable is from a source topic, we delay the 
> materialization until the downstream operator requires this KTable to be 
> materialized or send-old-values (see `KTableSourceNode` and `KTableSource`).
> 2.c) If the resulted KTable if from a join, we always materialize. However 
> this can be optimized similar to 2.b) but is orthogonal to this ticket (see 
> `KTableImpl#buildJoin` where we always use constructor with nameProvider != 
> null).
> 2.d) If the resulted KTable is from a stateless operation like filter / 
> mapValues, we never materialize.
> 
> Now, in all of these cases, we have logical node like "KTableKTableJoinNode", 
> as well as physical node like `ProcessorNode`. Ideally we should always 
> create the logical Plan (i.e. the StreamsGraph), and then optimize it if 
> necessary, and then generate the physical plan (i.e. the Topology), however 
> today we create some physical nodes beforehand, and the above logic is hence 
> duplicated in the creation of both physical nodes and logical nodes. For 
> example, in `KTableKTableJoinNode` we check if Materialized is null for 
> adding a state store, and in `KTableImpl#doJoin` we check if materialized is 
> specified (case 2.c) above). 
> Another example is in TableProcessorNode which is used for 2.d) above, in 
> which it includes the logic whereas its caller, `KTableImpl#doFilter` for 
> example, also contains the logic when deciding to pass `queryableName` 
> parameter to `KTableProcessorSupplier`.
> This is bug-vulnerable since we may update the logic in one class but forgot 
> to update the other class.
> --
> What we want to have is a cleaner code path similar to what we have for 2.b), 
> such that when creating the logical nodes we keep track of whether 1) 
> materialized is specified, and 2) queryable name is provided. And during 
> optimization phase, we may change the inner physical ProcessorBuilder's 
> parameters like queryable name etc, and then when it is time to generate the 
> physical node, we can just blindly take the parameters and go for it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7992) Add a server start time metric

2019-03-03 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7992?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782717#comment-16782717
 ] 

Lee Dongjin commented on KAFKA-7992:


'needs-kip' tag added.

> Add a server start time metric
> --
>
> Key: KAFKA-7992
> URL: https://issues.apache.org/jira/browse/KAFKA-7992
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
>  Labels: needs-kip
>
> KIP: KIP-436
> As with all software systems, observability into their health is critical.
> With many deployment platforms (be them custom-built or open-source), tasks 
> like restarting a misbehaving server in a cluster are completely automated. 
> With Kafka, monitoring systems have no definitive source of truth to gauge 
> when a server/client has been started. They are left to either use arbitrary 
> Kafka-specific metrics as a heuristic or the JVM RuntimeMXBean's StartTime, 
> which is not exactly indicative of when the application itself started
> It would be useful to have a metric exposing when the kafka server has 
> started.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7992) Add a server start time metric

2019-03-03 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7992?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7992:
---
Labels: needs-kip  (was: )

> Add a server start time metric
> --
>
> Key: KAFKA-7992
> URL: https://issues.apache.org/jira/browse/KAFKA-7992
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Stanislav Kozlovski
>Assignee: Stanislav Kozlovski
>Priority: Minor
>  Labels: needs-kip
>
> KIP: KIP-436
> As with all software systems, observability into their health is critical.
> With many deployment platforms (be them custom-built or open-source), tasks 
> like restarting a misbehaving server in a cluster are completely automated. 
> With Kafka, monitoring systems have no definitive source of truth to gauge 
> when a server/client has been started. They are left to either use arbitrary 
> Kafka-specific metrics as a heuristic or the JVM RuntimeMXBean's StartTime, 
> which is not exactly indicative of when the application itself started
> It would be useful to have a metric exposing when the kafka server has 
> started.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer

2019-03-02 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16782468#comment-16782468
 ] 

Lee Dongjin commented on KAFKA-7996:


[~guozhang] [~mjsax] Sorry for being late. To sum up, the initial issue 
description,
{quote}"{{KafkaStreams#close}} is working incorrectly since {{Producer#close}} 
is called without a timeout."
{quote}
was incorrect, rather
{quote}"Some components in {{KafkaStreams}} ({{Producer, AdminClient}}) are not 
closed properly for lack of timeout."
{quote}
is correct, right? Then, updating the issue description like the latter one 
would be better - I agree [~mjsax]'s opinion that this ticket is still valuable.

For the solution - as of now, {{[Producer, AdminClient]}} don't have default 
close timeout like {{Consumer#DEFAULT_CLOSE_TIMEOUT_MS}}. To solve this, there 
are two approaches like the following:

1. Add {{[Producer, AdminClient]#DEFAULT_CLOSE_TIMEOUT_MS}} and close with this 
value in {{KafkaStreams}}. This approach doesn't require KIP.
 2. Provide additional timeout options for closing {{[Producer, Consumer, 
AdminClient]}} in {{KafkaStreams}}. This approach provides users a way to 
control the behavior, but it is an API change so requires a KIP.

How do you think? I will follow your decision.

cc/ [~pkleindl]

> KafkaStreams does not pass timeout when closing Producer
> 
>
> Key: KAFKA-7996
> URL: https://issues.apache.org/jira/browse/KAFKA-7996
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Patrik Kleindl
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
>
> [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/]
> We are running 2.1 and have a case where the shutdown of a streams 
> application takes several minutes
> I noticed that although we call streams.close with a timeout of 30 seconds 
> the log says
> [Producer 
> clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] 
> Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> Matthias J Sax [vor 3 Tagen]
> I just checked the code, and yes, we don't provide a timeout for the producer 
> on close()...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer

2019-02-25 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-7996:
--

Assignee: Lee Dongjin

> KafkaStreams does not pass timeout when closing Producer
> 
>
> Key: KAFKA-7996
> URL: https://issues.apache.org/jira/browse/KAFKA-7996
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Patrik Kleindl
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
>
> [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/]
> We are running 2.1 and have a case where the shutdown of a streams 
> application takes several minutes
> I noticed that although we call streams.close with a timeout of 30 seconds 
> the log says
> [Producer 
> clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] 
> Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> Matthias J Sax [vor 3 Tagen]
> I just checked the code, and yes, we don't provide a timeout for the producer 
> on close()...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer

2019-02-25 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7996:
---
Labels: needs-kip  (was: )

> KafkaStreams does not pass timeout when closing Producer
> 
>
> Key: KAFKA-7996
> URL: https://issues.apache.org/jira/browse/KAFKA-7996
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Patrik Kleindl
>Priority: Major
>  Labels: needs-kip
>
> [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/]
> We are running 2.1 and have a case where the shutdown of a streams 
> application takes several minutes
> I noticed that although we call streams.close with a timeout of 30 seconds 
> the log says
> [Producer 
> clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] 
> Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> Matthias J Sax [vor 3 Tagen]
> I just checked the code, and yes, we don't provide a timeout for the producer 
> on close()...



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7920) Do not permit zstd use until inter.broker.protocol.version is updated to 2.1

2019-02-12 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7920?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-7920:
--

Assignee: Lee Dongjin

> Do not permit zstd use until inter.broker.protocol.version is updated to 2.1
> 
>
> Key: KAFKA-7920
> URL: https://issues.apache.org/jira/browse/KAFKA-7920
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.1.0
>Reporter: Jason Gustafson
>Assignee: Lee Dongjin
>Priority: Major
>
> After brokers have been upgraded to 2.1, users can begin using zstd 
> compression. Regardless of the inter.broker.protocol.version, the broker will 
> happily accept zstd-compressed data as long as the right produce request 
> version is used. However, if the inter.broker.protocol.version is set to 2.0 
> or below, then followers will not be able to use the minimum required fetch 
> version, which will result in the following error:
> {code}
> [2019-02-11 17:42:47,116] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error for partition foo-0 at offset 0 
> (kafka.server.ReplicaFetcherThread)   
>   
>  
> org.apache.kafka.common.errors.UnsupportedCompressionTypeException: The 
> requesting client does not support the compression type of given partition.
> {code}
> We should make produce request validation consistent. Until the 
> inter.broker.protocol.version is at 2.1 or later, we should reject produce 
> requests with UNSUPPORTED_COMPRESSION_TYPE.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7917) Streams store cleanup: collapse layers

2019-02-11 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7917?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16765551#comment-16765551
 ] 

Lee Dongjin commented on KAFKA-7917:


[~vvcephei] Thanks for creating the issue. I also have been thinking about this 
issue. Here are some suggestions:

1. How about making an umbrella issue like 'Improve Streams Store 
implementations' and make related issues (KAFKA-7916, KAFKA-7917, KAFKA-7918, 
KAFKA-7919) the subtasks of it?
2. Also, these issues need a discussion as [~mjsax] commented. How about 
opening a discussion thread after packing them under the umbrella issue?

> Streams store cleanup: collapse layers
> --
>
> Key: KAFKA-7917
> URL: https://issues.apache.org/jira/browse/KAFKA-7917
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: John Roesler
>Priority: Major
>
> Following on KAFKA-7916, we can consider collapsing the "streams management 
> layers" into one.
> Right now, we have:
>  * metering (also handles moving from pojo world to bytes world)
>  * change-logging
>  * caching
> This is good compositional style, but we also have some runtime overhead of 
> calling through all these layers, as well as some mental overhead of 
> understanding how many and which layers we are going through.
> Also, there are dependencies between the caching and change-logging layers.
> I _think_ it would simplify the code if we collapsed these into one layer 
> with boolean switches to turn on or off the different aspects. (rather than 
> wrapping the store with the different layers or not depending on the same 
> boolean conditions)



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7911) Add Timezone Support for Windowed Aggregations

2019-02-10 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16764389#comment-16764389
 ] 

Lee Dongjin commented on KAFKA-7911:


Understood. Thank you roe your response. Let's continue discussing in the 
mailing thread.

> Add Timezone Support for Windowed Aggregations
> --
>
> Key: KAFKA-7911
> URL: https://issues.apache.org/jira/browse/KAFKA-7911
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip
>
> Currently, Kafka Streams only support UTC timestamps. The impact is, that 
> `TimeWindows` are based on UTC time only. This is problematic for 24h 
> windows, because windows are build aligned to UTC-days, but not your local 
> time zone.
> While it's possible to "shift" timestamps as a workaround, it would be better 
> to allow native timezone support.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-7911) Add Timezone Support for Windowed Aggregations

2019-02-10 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16764346#comment-16764346
 ] 

Lee Dongjin edited comment on KAFKA-7911 at 2/10/19 8:45 AM:
-

[~mjsax] So... something like the following is needed. Right?

{{kStream}}
{{.windowedBy(TimeZone.getTimeZone("America/Los_Angeles"), 
TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))}}

or,

{{kStream}}
{{.withTimeZone(TimeZone.getTimeZone("America/Los_Angeles"))}}
{{.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))}}

The first approach is easy to implement, but the second one has a little bit 
more clear syntax. Which one would be better?


was (Author: dongjin.lee.kr):
[~mjsax] So... something like the following is needed. Right?

{{kStream}}
{{ .windowedBy(TimeZone.getTimeZone("America/Los_Angeles"), 
TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))}}

or,

{{kStream}}
{{ .withTimeZone(TimeZone.getTimeZone("America/Los_Angeles"))}}
{{ 
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))}}

 

The first approach is easy to implement, but the second one has a little bit 
more clear syntax. Which one would be better?

> Add Timezone Support for Windowed Aggregations
> --
>
> Key: KAFKA-7911
> URL: https://issues.apache.org/jira/browse/KAFKA-7911
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip
>
> Currently, Kafka Streams only support UTC timestamps. The impact is, that 
> `TimeWindows` are based on UTC time only. This is problematic for 24h 
> windows, because windows are build aligned to UTC-days, but not your local 
> time zone.
> While it's possible to "shift" timestamps as a workaround, it would be better 
> to allow native timezone support.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7911) Add Timezone Support for Windowed Aggregations

2019-02-10 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7911?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16764346#comment-16764346
 ] 

Lee Dongjin commented on KAFKA-7911:


[~mjsax] So... something like the following is needed. Right?

{{kStream}}
{{ .windowedBy(TimeZone.getTimeZone("America/Los_Angeles"), 
TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))}}

or,

{{kStream}}
{{ .withTimeZone(TimeZone.getTimeZone("America/Los_Angeles"))}}
{{ 
.windowedBy(TimeWindows.of(Duration.ofDays(7)).advanceBy(Duration.ofSeconds(1)))}}

 

The first approach is easy to implement, but the second one has a little bit 
more clear syntax. Which one would be better?

> Add Timezone Support for Windowed Aggregations
> --
>
> Key: KAFKA-7911
> URL: https://issues.apache.org/jira/browse/KAFKA-7911
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Minor
>  Labels: needs-kip
>
> Currently, Kafka Streams only support UTC timestamps. The impact is, that 
> `TimeWindows` are based on UTC time only. This is problematic for 24h 
> windows, because windows are build aligned to UTC-days, but not your local 
> time zone.
> While it's possible to "shift" timestamps as a workaround, it would be better 
> to allow native timezone support.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7293) Merge followed by groupByKey/join might violate co-partioning

2019-02-04 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-7293:
--

Assignee: Lee Dongjin

> Merge followed by groupByKey/join might violate co-partioning
> -
>
> Key: KAFKA-7293
> URL: https://issues.apache.org/jira/browse/KAFKA-7293
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Lee Dongjin
>Priority: Major
>
> The merge() operations can be applied to input KStreams that have a different 
> number of tasks (ie, input topic partitions). For this case, the input topics 
> are not co-partitioned and thus the result KStream is not partitioned even if 
> each input KStream is partitioned by its own.
> Because, no "repartitionRequired" flag is set on the input KStreams, the flag 
> is also not set on the output KStream. Hence, if a groupByKey() or join() 
> operation is applied the output KStream, we don't insert a repartition topic. 
> However, repartitioning would be required because the KStream is not 
> partitioned.
> We cannot detect this during compile time, because the number or partitions 
> is unknown, and thus, we cannot decide if repartitioning is required or not. 
> However, we can add a runtime check similar to joins() that checks if data is 
> correctly (co-)partitioned and if not, we can raise a runtime exception.
> Note, for merge() in contrast to join(), we should only check for 
> co-partitioning, if the merge() is followed by a groupByKey() or join() 
> operations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7896) Add some Log4J Kafka Properties for Producing to Secured Brokers

2019-02-04 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7896?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-7896:
--

Assignee: Rohan Desai

> Add some Log4J Kafka Properties for Producing to Secured Brokers
> 
>
> Key: KAFKA-7896
> URL: https://issues.apache.org/jira/browse/KAFKA-7896
> Project: Kafka
>  Issue Type: Bug
>Reporter: Rohan Desai
>Assignee: Rohan Desai
>Priority: Major
>
> The existing Log4J Kafka appender supports producing to brokers that use the 
> GSSAPI (kerberos) sasl mechanism, and only support configuring jaas via a 
> jaas config file. Filing this issue to cover extending this to include the 
> PLAIN mechanism and to support configuring jaas via an in-line configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7896) Add some Log4J Kafka Properties for Producing to Secured Brokers

2019-02-04 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7896?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16759731#comment-16759731
 ] 

Lee Dongjin commented on KAFKA-7896:


Here is the PR: 
[https://github.com/apache/kafka/pull/6216|https://github.com/apache/kafka/pull/6216]

> Add some Log4J Kafka Properties for Producing to Secured Brokers
> 
>
> Key: KAFKA-7896
> URL: https://issues.apache.org/jira/browse/KAFKA-7896
> Project: Kafka
>  Issue Type: Bug
>Reporter: Rohan Desai
>Priority: Major
>
> The existing Log4J Kafka appender supports producing to brokers that use the 
> GSSAPI (kerberos) sasl mechanism, and only support configuring jaas via a 
> jaas config file. Filing this issue to cover extending this to include the 
> PLAIN mechanism and to support configuring jaas via an in-line configuration.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7293) Merge followed by groupByKey/join might violate co-partioning

2019-02-02 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16759019#comment-16759019
 ] 

Lee Dongjin commented on KAFKA-7293:


[~mjsax] So... in short, if all input {{KStreams}} given to {{KStreams#merge}} 
method are not flagged with {{repartitionRequired}}, the output {{KStream}} 
(with {{repartitionRequired = false}}) may not correctly be partitioned so it 
can yield an incorrect result if {{groupByKey}} or {{join}} method is called 
upon it. Am I understand correctly?

If so, I have an idea:

1. Add a flag to denote 'a runtime inspection required, whether data is 
correctly (co-)partitioned or not.' (like {{repartitionRequired}} flag.)
 2. When {{merge}} method is called with non-repartitioned {{KStream}}s, set 
the flag above.
 3. In {{KStream#[join, groupByKey, ...]}}, call 
{{THIS.ensureJoinableWith(THAT)}} to inspect co-partitioning if the flag above 
is set. (This method is called in {{KStream#[doJoin, doStreamTableJoin]}} now.)

If it is what you want, I would like to open a PR.

> Merge followed by groupByKey/join might violate co-partioning
> -
>
> Key: KAFKA-7293
> URL: https://issues.apache.org/jira/browse/KAFKA-7293
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>
> The merge() operations can be applied to input KStreams that have a different 
> number of tasks (ie, input topic partitions). For this case, the input topics 
> are not co-partitioned and thus the result KStream is not partitioned even if 
> each input KStream is partitioned by its own.
> Because, no "repartitionRequired" flag is set on the input KStreams, the flag 
> is also not set on the output KStream. Hence, if a groupByKey() or join() 
> operation is applied the output KStream, we don't insert a repartition topic. 
> However, repartitioning would be required because the KStream is not 
> partitioned.
> We cannot detect this during compile time, because the number or partitions 
> is unknown, and thus, we cannot decide if repartitioning is required or not. 
> However, we can add a runtime check similar to joins() that checks if data is 
> correctly (co-)partitioned and if not, we can raise a runtime exception.
> Note, for merge() in contrast to join(), we should only check for 
> co-partitioning, if the merge() is followed by a groupByKey() or join() 
> operations.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7884) Docs for message.format.version and log.message.format.version show invalid (corrupt?) "valid values"

2019-01-31 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7884:
---
Fix Version/s: 2.1.1
   2.2.0

> Docs for message.format.version and log.message.format.version show invalid 
> (corrupt?) "valid values"
> -
>
> Key: KAFKA-7884
> URL: https://issues.apache.org/jira/browse/KAFKA-7884
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: James Cheng
>Assignee: Lee Dongjin
>Priority: Major
> Fix For: 2.2.0, 2.1.1
>
>
> In the docs for message.format.version and log.message.format.version, the 
> list of valid values is
>  
> {code:java}
> kafka.api.ApiVersionValidator$@56aac163 
> {code}
>  
> It appears it's simply doing a .toString on the class/instance.
> At a minimum, we should remove this java-y-ness.
> Even better is, it should show all the valid values.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7884) Docs for message.format.version and log.message.format.version show invalid (corrupt?) "valid values"

2019-01-30 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7884?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-7884:
--

Assignee: Lee Dongjin

> Docs for message.format.version and log.message.format.version show invalid 
> (corrupt?) "valid values"
> -
>
> Key: KAFKA-7884
> URL: https://issues.apache.org/jira/browse/KAFKA-7884
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: James Cheng
>Assignee: Lee Dongjin
>Priority: Major
>
> In the docs for message.format.version and log.message.format.version, the 
> list of valid values is
>  
> {code:java}
> kafka.api.ApiVersionValidator$@56aac163 
> {code}
>  
> It appears it's simply doing a .toString on the class/instance.
> At a minimum, we should remove this java-y-ness.
> Even better is, it should show all the valid values.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7840) Documentation for cleanup.policy is out of date

2019-01-26 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7840:
---
Fix Version/s: 2.1.1
   2.2.0

> Documentation for cleanup.policy is out of date
> ---
>
> Key: KAFKA-7840
> URL: https://issues.apache.org/jira/browse/KAFKA-7840
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: Josh Wiley
>Assignee: Lee Dongjin
>Priority: Trivial
> Fix For: 2.2.0, 2.1.1
>
>
> The current 
> [documentation|http://kafka.apache.org/documentation/#topicconfigs] states 
> the following regarding cleanup.policy:
> ??A string that is either "delete" or "compact"??
> This implies that the two options are mutually exclusive. However, 
> [KAFKA-4015|https://issues.apache.org/jira/browse/KAFKA-4015] added the 
> ability to set "cleanup.policy=compact,delete" to enable size/time-based 
> retention alongside key-based retention.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Resolved] (KAFKA-7840) Documentation for cleanup.policy is out of date

2019-01-26 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin resolved KAFKA-7840.

Resolution: Fixed

> Documentation for cleanup.policy is out of date
> ---
>
> Key: KAFKA-7840
> URL: https://issues.apache.org/jira/browse/KAFKA-7840
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: Josh Wiley
>Assignee: Lee Dongjin
>Priority: Trivial
>
> The current 
> [documentation|http://kafka.apache.org/documentation/#topicconfigs] states 
> the following regarding cleanup.policy:
> ??A string that is either "delete" or "compact"??
> This implies that the two options are mutually exclusive. However, 
> [KAFKA-4015|https://issues.apache.org/jira/browse/KAFKA-4015] added the 
> ability to set "cleanup.policy=compact,delete" to enable size/time-based 
> retention alongside key-based retention.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7782) cleanup policy for offsets topic should be configurable

2019-01-24 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16751289#comment-16751289
 ] 

Lee Dongjin commented on KAFKA-7782:


{quote}It's is by design that the cleanup policy should not be changed.{quote}

Then, the offsets topic can be altered with kafka-configs.sh *is* the bug. 
Right?

> cleanup policy for offsets topic should be configurable
> ---
>
> Key: KAFKA-7782
> URL: https://issues.apache.org/jira/browse/KAFKA-7782
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Nir Barel
>Priority: Major
>  Labels: windows
>
> Hi,
> For offsets topic there is no option to change the log cleanup policy from 
> compact to delete because it is set hard-coded in code here:
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L68]
> The only way to override it is to alter the topic at runtime which required 
> to check the settings and set it after every restart of Kafka.
> I am suggesting to configure it via properties with a default value of 
> "compact"
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7782) cleanup policy for offsets topic should be configurable

2019-01-21 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7782?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16747984#comment-16747984
 ] 

Lee Dongjin commented on KAFKA-7782:


[~guozhang] [~hachikuji] Is this a bug, or configuring cleanup policy for 
`__consumer_offsets` should not be allowed? In fact, I found [a 
case|https://stackoverflow.com/questions/42546501/the-retention-period-for-offset-topic-of-kafka]
 which requires this feature.

If it is a bug, it seems like tagged with `needs-kip` with later target 
version. Could you have a look?

> cleanup policy for offsets topic should be configurable
> ---
>
> Key: KAFKA-7782
> URL: https://issues.apache.org/jira/browse/KAFKA-7782
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Nir Barel
>Priority: Major
>  Labels: windows
>
> Hi,
> For offsets topic there is no option to change the log cleanup policy from 
> compact to delete because it is set hard-coded in code here:
> [https://github.com/apache/kafka/blob/trunk/core/src/main/scala/kafka/coordinator/group/GroupCoordinator.scala#L68]
> The only way to override it is to alter the topic at runtime which required 
> to check the settings and set it after every restart of Kafka.
> I am suggesting to configure it via properties with a default value of 
> "compact"
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7840) Documentation for cleanup.policy is out of date

2019-01-21 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7840?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16747922#comment-16747922
 ] 

Lee Dongjin commented on KAFKA-7840:


[~mjsax] [~ijuma] This issue should be included in 2.2.0 and 2.1.1. Isn't it?

> Documentation for cleanup.policy is out of date
> ---
>
> Key: KAFKA-7840
> URL: https://issues.apache.org/jira/browse/KAFKA-7840
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: Josh Wiley
>Assignee: Lee Dongjin
>Priority: Trivial
>
> The current 
> [documentation|http://kafka.apache.org/documentation/#topicconfigs] states 
> the following regarding cleanup.policy:
> ??A string that is either "delete" or "compact"??
> This implies that the two options are mutually exclusive. However, 
> [KAFKA-4015|https://issues.apache.org/jira/browse/KAFKA-4015] added the 
> ability to set "cleanup.policy=compact,delete" to enable size/time-based 
> retention alongside key-based retention.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7840) Documentation for cleanup.policy is out of date

2019-01-21 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7840?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-7840:
--

Assignee: Lee Dongjin

> Documentation for cleanup.policy is out of date
> ---
>
> Key: KAFKA-7840
> URL: https://issues.apache.org/jira/browse/KAFKA-7840
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Reporter: Josh Wiley
>Assignee: Lee Dongjin
>Priority: Trivial
>
> The current 
> [documentation|http://kafka.apache.org/documentation/#topicconfigs] states 
> the following regarding cleanup.policy:
> ??A string that is either "delete" or "compact"??
> This implies that the two options are mutually exclusive. However, 
> [KAFKA-4015|https://issues.apache.org/jira/browse/KAFKA-4015] added the 
> ability to set "cleanup.policy=compact,delete" to enable size/time-based 
> retention alongside key-based retention.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7808) AdminClient#describeTopics should not throw InvalidTopicException if topic name is not found

2019-01-11 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16741133#comment-16741133
 ] 

Lee Dongjin commented on KAFKA-7808:


[~cmccabe] Could you mark this topic as resolved? [~guozhang] Could it be 
included in 2.2.0 and 2.1.1? Then, may I set the 'Fix Version/s'?

> AdminClient#describeTopics should not throw InvalidTopicException if topic 
> name is not found
> 
>
> Key: KAFKA-7808
> URL: https://issues.apache.org/jira/browse/KAFKA-7808
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, core
>Reporter: Guozhang Wang
>Assignee: Lee Dongjin
>Priority: Major
>
> In AdminClient#describeTopics, we have the following logic:
> {code:java}
> if (!cluster.topics().contains(topicName)) {
> future.completeExceptionally(new 
> InvalidTopicException("Topic " + topicName + " not found."));
> continue;
> }
> {code}
> However, {{InvalidTopicException}} is a non-retriable exception and is used 
> to indicate that topic contains invalid chars or topic name is too long etc, 
> and hence not correct to use. We should, instead, throw the retriable 
> {{UnknownTopicOrPartitionException}} instead.
> We should make sure any callers on this logic should be cleaned up when 
> fixing it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7808) AdminClient#describeTopics should not throw InvalidTopicException if topic name is not found

2019-01-11 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7808:
---
Summary: AdminClient#describeTopics should not throw InvalidTopicException 
if topic name is not found  (was: AdminClient#describeTopic should not throw 
InvalidTopic if topic name is not found)

> AdminClient#describeTopics should not throw InvalidTopicException if topic 
> name is not found
> 
>
> Key: KAFKA-7808
> URL: https://issues.apache.org/jira/browse/KAFKA-7808
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, core
>Reporter: Guozhang Wang
>Priority: Major
>
> In AdminClient#describeTopic, we have the following logic:
> {code}
> if (!cluster.topics().contains(topicName)) {
> future.completeExceptionally(new 
> InvalidTopicException("Topic " + topicName + " not found."));
> continue;
> }
> {code}
> However, {{InvalidTopicException}} is a non-retriable exception and is used 
> to indicate that topic contains invalid chars or topic name is too long etc, 
> and hence not correct to use. We should, instead, throw the retriable 
> {{UnknownTopicOrPartitionException}} instead.
> We should make sure any callers on this logic should be cleaned up when 
> fixing it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7808) AdminClient#describeTopics should not throw InvalidTopicException if topic name is not found

2019-01-11 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-7808:
--

Assignee: Lee Dongjin

> AdminClient#describeTopics should not throw InvalidTopicException if topic 
> name is not found
> 
>
> Key: KAFKA-7808
> URL: https://issues.apache.org/jira/browse/KAFKA-7808
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, core
>Reporter: Guozhang Wang
>Assignee: Lee Dongjin
>Priority: Major
>
> In AdminClient#describeTopics, we have the following logic:
> {code:java}
> if (!cluster.topics().contains(topicName)) {
> future.completeExceptionally(new 
> InvalidTopicException("Topic " + topicName + " not found."));
> continue;
> }
> {code}
> However, {{InvalidTopicException}} is a non-retriable exception and is used 
> to indicate that topic contains invalid chars or topic name is too long etc, 
> and hence not correct to use. We should, instead, throw the retriable 
> {{UnknownTopicOrPartitionException}} instead.
> We should make sure any callers on this logic should be cleaned up when 
> fixing it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7808) AdminClient#describeTopics should not throw InvalidTopicException if topic name is not found

2019-01-11 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7808?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16740449#comment-16740449
 ] 

Lee Dongjin commented on KAFKA-7808:


Here is the PR: https://github.com/apache/kafka/pull/6124

> AdminClient#describeTopics should not throw InvalidTopicException if topic 
> name is not found
> 
>
> Key: KAFKA-7808
> URL: https://issues.apache.org/jira/browse/KAFKA-7808
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, core
>Reporter: Guozhang Wang
>Assignee: Lee Dongjin
>Priority: Major
>
> In AdminClient#describeTopics, we have the following logic:
> {code:java}
> if (!cluster.topics().contains(topicName)) {
> future.completeExceptionally(new 
> InvalidTopicException("Topic " + topicName + " not found."));
> continue;
> }
> {code}
> However, {{InvalidTopicException}} is a non-retriable exception and is used 
> to indicate that topic contains invalid chars or topic name is too long etc, 
> and hence not correct to use. We should, instead, throw the retriable 
> {{UnknownTopicOrPartitionException}} instead.
> We should make sure any callers on this logic should be cleaned up when 
> fixing it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7808) AdminClient#describeTopics should not throw InvalidTopicException if topic name is not found

2019-01-11 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7808?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7808:
---
Description: 
In AdminClient#describeTopics, we have the following logic:
{code:java}
if (!cluster.topics().contains(topicName)) {
future.completeExceptionally(new 
InvalidTopicException("Topic " + topicName + " not found."));
continue;
}
{code}
However, {{InvalidTopicException}} is a non-retriable exception and is used to 
indicate that topic contains invalid chars or topic name is too long etc, and 
hence not correct to use. We should, instead, throw the retriable 
{{UnknownTopicOrPartitionException}} instead.

We should make sure any callers on this logic should be cleaned up when fixing 
it.

  was:
In AdminClient#describeTopic, we have the following logic:

{code}
if (!cluster.topics().contains(topicName)) {
future.completeExceptionally(new 
InvalidTopicException("Topic " + topicName + " not found."));
continue;
}
{code}

However, {{InvalidTopicException}} is a non-retriable exception and is used to 
indicate that topic contains invalid chars or topic name is too long etc, and 
hence not correct to use. We should, instead, throw the retriable 
{{UnknownTopicOrPartitionException}} instead.

We should make sure any callers on this logic should be cleaned up when fixing 
it.


> AdminClient#describeTopics should not throw InvalidTopicException if topic 
> name is not found
> 
>
> Key: KAFKA-7808
> URL: https://issues.apache.org/jira/browse/KAFKA-7808
> Project: Kafka
>  Issue Type: Bug
>  Components: admin, core
>Reporter: Guozhang Wang
>Priority: Major
>
> In AdminClient#describeTopics, we have the following logic:
> {code:java}
> if (!cluster.topics().contains(topicName)) {
> future.completeExceptionally(new 
> InvalidTopicException("Topic " + topicName + " not found."));
> continue;
> }
> {code}
> However, {{InvalidTopicException}} is a non-retriable exception and is used 
> to indicate that topic contains invalid chars or topic name is too long etc, 
> and hence not correct to use. We should, instead, throw the retriable 
> {{UnknownTopicOrPartitionException}} instead.
> We should make sure any callers on this logic should be cleaned up when 
> fixing it.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7632) Allow fine-grained configuration for compression

2019-01-06 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16735275#comment-16735275
 ] 

Lee Dongjin commented on KAFKA-7632:


The title was changed following the discussion of 
[KIP-390|https://cwiki.apache.org/confluence/display/KAFKA/KIP-390%3A+Allow+fine-grained+configuration+for+compression].

> Allow fine-grained configuration for compression
> 
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression. Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.
> Since it applies to the other compression codecs, we should add the same 
> functionalities to them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7632) Allow fine-grained configuration for compression

2019-01-06 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7632:
---
Summary: Allow fine-grained configuration for compression  (was: Add 
producer option to adjust compression level)

> Allow fine-grained configuration for compression
> 
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression. Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.
> Since it applies to the other compression codecs, we should add the same 
> functionalities to them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7632) Add producer option to adjust compression level

2018-11-16 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7632:
---
Description: 
The compression level for ZSTD is currently set to use the default level (3), 
which is a conservative setting that in some use cases eliminates the value 
that ZSTD provides with improved compression. Each use case will vary, so 
exposing the level as a broker configuration setting will allow the user to 
adjust the level.

Since it applies to the other compression codecs, we should add the same 
functionalities to them.

  was:
The compression level for ZSTD is currently set to use the default level (3), 
which is a conservative setting that in some use cases eliminates the value 
that ZSTD provides with improved compression. Each use case will vary, so 
exposing the level as a broker configuration setting will allow the user to 
adjust the level.

Since it applies to the other compresssion codecs, we should add same 
functionalities to them/


> Add producer option to adjust compression level
> ---
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression. Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.
> Since it applies to the other compression codecs, we should add the same 
> functionalities to them.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd

2018-11-16 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7632:
---
Description: 
The compression level for ZSTD is currently set to use the default level (3), 
which is a conservative setting that in some use cases eliminates the value 
that ZSTD provides with improved compression. Each use case will vary, so 
exposing the level as a broker configuration setting will allow the user to 
adjust the level.

Since it applies to the other compresssion codecs, we should add same 
functionalities to them/

  was:
The compression level for ZSTD is currently set to use the default level (3), 
which is a conservative setting that in some use cases eliminates the value 
that ZSTD provides with improved compression.  Each use case will vary, so 
exposing the level as a broker configuration setting will allow the user to 
adjust the level.

 


> Add option to kafka broker config to adjust compression level for zstd
> --
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression. Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.
> Since it applies to the other compresssion codecs, we should add same 
> functionalities to them/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7632) Add producer option to adjust compression level

2018-11-16 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7632:
---
Summary: Add producer option to adjust compression level  (was: Add option 
to kafka broker config to adjust compression level for zstd)

> Add producer option to adjust compression level
> ---
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression. Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.
> Since it applies to the other compresssion codecs, we should add same 
> functionalities to them/



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd

2018-11-16 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7632:
---
Description: 
The compression level for ZSTD is currently set to use the default level (3), 
which is a conservative setting that in some use cases eliminates the value 
that ZSTD provides with improved compression.  Each use case will vary, so 
exposing the level as a broker configuration setting will allow the user to 
adjust the level.

 

  was:The compression level for ZSTD is currently set to use the default level 
(3), which is a conservative setting that in some use cases eliminates the 
value that ZSTD provides with improved compression.  Each use case will vary, 
so exposing the level as a broker configuration setting will allow the user to 
adjust the level.


> Add option to kafka broker config to adjust compression level for zstd
> --
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression.  Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd

2018-11-16 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7632:
---
Fix Version/s: (was: 2.1.0)
   2.2.0

> Add option to kafka broker config to adjust compression level for zstd
> --
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression.  Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd

2018-11-16 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7632:
---
Component/s: (was: core)
 clients

> Add option to kafka broker config to adjust compression level for zstd
> --
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.2.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression.  Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd

2018-11-16 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-7632:
---
Priority: Major  (was: Trivial)

> Add option to kafka broker config to adjust compression level for zstd
> --
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Major
>  Labels: needs-kip
> Fix For: 2.1.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression.  Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7632) Add option to kafka broker config to adjust compression level for zstd

2018-11-16 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7632?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-7632:
--

Assignee: Lee Dongjin

> Add option to kafka broker config to adjust compression level for zstd
> --
>
> Key: KAFKA-7632
> URL: https://issues.apache.org/jira/browse/KAFKA-7632
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.1.0
> Environment: all
>Reporter: Dave Waters
>Assignee: Lee Dongjin
>Priority: Trivial
>  Labels: needs-kip
> Fix For: 2.1.0
>
>
> The compression level for ZSTD is currently set to use the default level (3), 
> which is a conservative setting that in some use cases eliminates the value 
> that ZSTD provides with improved compression.  Each use case will vary, so 
> exposing the level as a broker configuration setting will allow the user to 
> adjust the level.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7549) Old ProduceRequest with zstd compression does not return error to client

2018-11-09 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16681617#comment-16681617
 ] 

Lee Dongjin commented on KAFKA-7549:


[~lindong] Well, it seems like the ProduceRequest was not actually sent to the 
brokers; That exception is thrown in ProducerRequest#validateRecords, which is 
called by the constructor of ProducerRequest only; In other words, it means the 
instantiation of ProducerRequest was failed. [~edenhill] Could you explain more 
in detail?

> Old ProduceRequest with zstd compression does not return error to client
> 
>
> Key: KAFKA-7549
> URL: https://issues.apache.org/jira/browse/KAFKA-7549
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Magnus Edenhill
>Assignee: Lee Dongjin
>Priority: Major
> Fix For: 2.1.0
>
>
> Kafka broker v2.1.0rc0.
>  
> KIP-110 states that:
> "Zstd will only be allowed for the bumped produce API. That is, for older 
> version clients(=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE 
> regardless of the message format."
>  
> However, sending a ProduceRequest V3 with zstd compression (which is a client 
> side bug) closes the connection with the following exception rather than 
> returning UNSUPPORTED_COMPRESSION_TYPE in the ProduceResponse:
>  
> {noformat}
> [2018-10-25 11:40:31,813] ERROR Exception while processing request from 
> 127.0.0.1:60723-127.0.0.1:60656-94 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: PRODUCE, apiVersion: 3, connectionId: 
> 127.0.0.1:60723-127.0.0.1:60656-94, listenerName: ListenerName(PLAINTEXT), 
> principal: User:ANONYMOUS
> Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce 
> requests with version 3 are note allowed to use ZStandard compression
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7549) Old ProduceRequest with zstd compression does not return error to client

2018-11-05 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16676115#comment-16676115
 ] 

Lee Dongjin commented on KAFKA-7549:


[~ijuma]  [~hachikuji] Ping!

> Old ProduceRequest with zstd compression does not return error to client
> 
>
> Key: KAFKA-7549
> URL: https://issues.apache.org/jira/browse/KAFKA-7549
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Magnus Edenhill
>Assignee: Lee Dongjin
>Priority: Major
> Fix For: 2.1.0
>
>
> Kafka broker v2.1.0rc0.
>  
> KIP-110 states that:
> "Zstd will only be allowed for the bumped produce API. That is, for older 
> version clients(=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE 
> regardless of the message format."
>  
> However, sending a ProduceRequest V3 with zstd compression (which is a client 
> side bug) closes the connection with the following exception rather than 
> returning UNSUPPORTED_COMPRESSION_TYPE in the ProduceResponse:
>  
> {noformat}
> [2018-10-25 11:40:31,813] ERROR Exception while processing request from 
> 127.0.0.1:60723-127.0.0.1:60656-94 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: PRODUCE, apiVersion: 3, connectionId: 
> 127.0.0.1:60723-127.0.0.1:60656-94, listenerName: ListenerName(PLAINTEXT), 
> principal: User:ANONYMOUS
> Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce 
> requests with version 3 are note allowed to use ZStandard compression
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7549) Old ProduceRequest with zstd compression does not return error to client

2018-10-26 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16665396#comment-16665396
 ] 

Lee Dongjin commented on KAFKA-7549:


[~ijuma] I reviewed the problem. Here is the result:

The KIP documentation states what broker returns for invalid Produce request 
(i.e., ZSTD compressed message with a Produce API version below 7), but not 
what happens when the user tries to instantiate an invalid ProducerRequest 
instance. Validation on ProducerRequest instantiation was [proposed and 
added|https://github.com/apache/kafka/pull/2267#discussion_r222913248] after 
the KIP was accepted and the revision was under progress. It is why it is not 
in the KIP document - Sorry, I was updating the KIP following the discussion on 
PR but not completed yet.

Here, we have three choices:

1. Throw InvalidRecordException (current): It follows the consistency with 
other exceptions in ProduceRequest#validateRecords.
 2. Throw UnsupportedVersionException: [What Jason proposed at 
first|https://github.com/apache/kafka/pull/2267#pullrequestreview-162177303], 
but breaks consistency.
 3. Throw UnsupportedCompressionType: It follows what broker returns, but 
breaks consistency also.

Which approach do you prefer?

> Old ProduceRequest with zstd compression does not return error to client
> 
>
> Key: KAFKA-7549
> URL: https://issues.apache.org/jira/browse/KAFKA-7549
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Magnus Edenhill
>Assignee: Lee Dongjin
>Priority: Major
> Fix For: 2.1.0
>
>
> Kafka broker v2.1.0rc0.
>  
> KIP-110 states that:
> "Zstd will only be allowed for the bumped produce API. That is, for older 
> version clients(=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE 
> regardless of the message format."
>  
> However, sending a ProduceRequest V3 with zstd compression (which is a client 
> side bug) closes the connection with the following exception rather than 
> returning UNSUPPORTED_COMPRESSION_TYPE in the ProduceResponse:
>  
> {noformat}
> [2018-10-25 11:40:31,813] ERROR Exception while processing request from 
> 127.0.0.1:60723-127.0.0.1:60656-94 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: PRODUCE, apiVersion: 3, connectionId: 
> 127.0.0.1:60723-127.0.0.1:60656-94, listenerName: ListenerName(PLAINTEXT), 
> principal: User:ANONYMOUS
> Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce 
> requests with version 3 are note allowed to use ZStandard compression
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7549) Old ProduceRequest with zstd compression does not return error to client

2018-10-25 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16663865#comment-16663865
 ] 

Lee Dongjin commented on KAFKA-7549:


[~ijuma] No problem. Let me have a look.

> Old ProduceRequest with zstd compression does not return error to client
> 
>
> Key: KAFKA-7549
> URL: https://issues.apache.org/jira/browse/KAFKA-7549
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Magnus Edenhill
>Priority: Major
> Fix For: 2.1.0
>
>
> Kafka broker v2.1.0rc0.
>  
> KIP-110 states that:
> "Zstd will only be allowed for the bumped produce API. That is, for older 
> version clients(=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE 
> regardless of the message format."
>  
> However, sending a ProduceRequest V3 with zstd compression (which is a client 
> side bug) closes the connection with the following exception rather than 
> returning UNSUPPORTED_COMPRESSION_TYPE in the ProduceResponse:
>  
> {noformat}
> [2018-10-25 11:40:31,813] ERROR Exception while processing request from 
> 127.0.0.1:60723-127.0.0.1:60656-94 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: PRODUCE, apiVersion: 3, connectionId: 
> 127.0.0.1:60723-127.0.0.1:60656-94, listenerName: ListenerName(PLAINTEXT), 
> principal: User:ANONYMOUS
> Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce 
> requests with version 3 are note allowed to use ZStandard compression
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7549) Old ProduceRequest with zstd compression does not return error to client

2018-10-25 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7549?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-7549:
--

Assignee: Lee Dongjin

> Old ProduceRequest with zstd compression does not return error to client
> 
>
> Key: KAFKA-7549
> URL: https://issues.apache.org/jira/browse/KAFKA-7549
> Project: Kafka
>  Issue Type: Bug
>  Components: compression
>Reporter: Magnus Edenhill
>Assignee: Lee Dongjin
>Priority: Major
> Fix For: 2.1.0
>
>
> Kafka broker v2.1.0rc0.
>  
> KIP-110 states that:
> "Zstd will only be allowed for the bumped produce API. That is, for older 
> version clients(=below KAFKA_2_1_IV0), we return UNSUPPORTED_COMPRESSION_TYPE 
> regardless of the message format."
>  
> However, sending a ProduceRequest V3 with zstd compression (which is a client 
> side bug) closes the connection with the following exception rather than 
> returning UNSUPPORTED_COMPRESSION_TYPE in the ProduceResponse:
>  
> {noformat}
> [2018-10-25 11:40:31,813] ERROR Exception while processing request from 
> 127.0.0.1:60723-127.0.0.1:60656-94 (kafka.network.Processor)
> org.apache.kafka.common.errors.InvalidRequestException: Error getting request 
> for apiKey: PRODUCE, apiVersion: 3, connectionId: 
> 127.0.0.1:60723-127.0.0.1:60656-94, listenerName: ListenerName(PLAINTEXT), 
> principal: User:ANONYMOUS
> Caused by: org.apache.kafka.common.record.InvalidRecordException: Produce 
> requests with version 3 are note allowed to use ZStandard compression
> {noformat}



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-7123) Documentation: broken link http://confluent.io/docs/current/kafka-rest/docs/intro.html

2018-06-30 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7123?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin reassigned KAFKA-7123:
--

Assignee: Lee Dongjin

> Documentation: broken link 
> http://confluent.io/docs/current/kafka-rest/docs/intro.html
> --
>
> Key: KAFKA-7123
> URL: https://issues.apache.org/jira/browse/KAFKA-7123
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Vitaliy Fedoriv
>Assignee: Lee Dongjin
>Priority: Minor
>
> Broken link on page
> https://cwiki.apache.org/confluence/display/KAFKA/Clients
> for Confluent REST Proxy
> Should be
> https://docs.confluent.io/current/kafka-rest/docs/index.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-7123) Documentation: broken link http://confluent.io/docs/current/kafka-rest/docs/intro.html

2018-06-30 Thread Lee Dongjin (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-7123?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16528905#comment-16528905
 ] 

Lee Dongjin commented on KAFKA-7123:


Fixed.

> Documentation: broken link 
> http://confluent.io/docs/current/kafka-rest/docs/intro.html
> --
>
> Key: KAFKA-7123
> URL: https://issues.apache.org/jira/browse/KAFKA-7123
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Vitaliy Fedoriv
>Assignee: Lee Dongjin
>Priority: Minor
>
> Broken link on page
> https://cwiki.apache.org/confluence/display/KAFKA/Clients
> for Confluent REST Proxy
> Should be
> https://docs.confluent.io/current/kafka-rest/docs/index.html



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Updated] (KAFKA-6993) Fix defective documentations for KStream/KTable methods

2018-06-05 Thread Lee Dongjin (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6993?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Lee Dongjin updated KAFKA-6993:
---
Summary: Fix defective documentations for KStream/KTable methods  (was: 
Defective documentations for KStream/KTable methods)

> Fix defective documentations for KStream/KTable methods
> ---
>
> Key: KAFKA-6993
> URL: https://issues.apache.org/jira/browse/KAFKA-6993
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation, streams
>Reporter: Lee Dongjin
>Assignee: Lee Dongjin
>Priority: Minor
>
> Some KStream/KTable methods, mainly join methods, are providing same 
> documentation for the default methods and overloaded methods:
>  * KStream#join
>  * KStream#leftJoin
>  * KStream#outerJoin
>  * KTable#filter
>  * KTable#filterNot
>  * KTable#mapValues
>  * KTable#transformValues
>  * KTable#join
>  * KTable#leftJoin
>  * KTable#outerJoin
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Created] (KAFKA-6993) Defective documentations for KStream/KTable methods

2018-06-05 Thread Lee Dongjin (JIRA)
Lee Dongjin created KAFKA-6993:
--

 Summary: Defective documentations for KStream/KTable methods
 Key: KAFKA-6993
 URL: https://issues.apache.org/jira/browse/KAFKA-6993
 Project: Kafka
  Issue Type: Bug
  Components: documentation, streams
Reporter: Lee Dongjin
Assignee: Lee Dongjin


Some KStream/KTable methods, mainly join methods, are providing same 
documentation for the default methods and overloaded methods:
 * KStream#join
 * KStream#leftJoin
 * KStream#outerJoin
 * KTable#filter
 * KTable#filterNot
 * KTable#mapValues
 * KTable#transformValues
 * KTable#join
 * KTable#leftJoin
 * KTable#outerJoin

 



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)