[jira] [Commented] (KAFKA-10694) Implement zero copy for FetchSnapshot

2020-11-09 Thread lqjacklee (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10694?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17229029#comment-17229029
 ] 

lqjacklee commented on KAFKA-10694:
---

[~jagsancio] Can I take the task? 

> Implement zero copy for FetchSnapshot
> -
>
> Key: KAFKA-10694
> URL: https://issues.apache.org/jira/browse/KAFKA-10694
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Priority: Major
>
> Change the _RawSnapshotWriter_ and _RawSnapshotReader_ interfaces to allow 
> sending and receiving _FetchSnapshotResponse_ with minimal memory copies.
> This could be implemented by making the following changes
> {code:java}
> interface RawSnapshotWriter {
>   ...
>   public void append(MemoryRecords records) throws IOException;
> } {code}
> {code:java}
> interface RawSnapshotReader {
>   ...
>   public BaseRecords slice(long position) throws IOException;
> }{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] iprithv commented on a change in pull request #9204: KAFKA-6181 Examining log messages with {{--deep-iteration}} should show superset of fields

2020-11-09 Thread GitBox


iprithv commented on a change in pull request #9204:
URL: https://github.com/apache/kafka/pull/9204#discussion_r520336051



##
File path: core/src/main/scala/kafka/tools/DumpLogSegments.scala
##
@@ -257,8 +257,11 @@ object DumpLogSegments {
 }
 lastOffset = record.offset
 
-print(s"$RecordIndent offset: ${record.offset} 
${batch.timestampType}: ${record.timestamp} " +
-  s"keysize: ${record.keySize} valuesize: ${record.valueSize}")
+print(s"$RecordIndent offset: ${record.offset} baseOffset: 
${batch.baseOffset} lastOffset: ${batch.lastOffset} baseSequence: ${} 
${batch.baseSequence}" +
+ s" lastSequence: ${batch.lastSequence} producerEpoch: 
${batch.producerEpoch} partitionLeaderEpoch: ${batch.partitionLeaderEpoch} 
position: ${validBytes} " +
+ s" ${batch.timestampType}: ${record.timestamp} isvalid: 
${record.isValid}"+
+ s" keysize: ${record.keySize} valuesize: ${record.valueSize} 
size: ${batch.sizeInBytes} magic: ${batch.magic} " +

Review comment:
   @chia7712  Have seperated the record details from batch details. Also 
changed the entries to camel case. Please review this. Thanks.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()

2020-11-09 Thread GitBox


chia7712 commented on pull request #9433:
URL: https://github.com/apache/kafka/pull/9433#issuecomment-724472427


   Could you offer test to make sure ```None``` is included.
   
   Personally, the implementations of ```errorCounts``` are almost same. Maybe 
it should be implemented
by auto-generated protocol so the consistency (code style and behavior) can 
be protected. @hachikuji WDYT?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9204: KAFKA-6181 Examining log messages with {{--deep-iteration}} should show superset of fields

2020-11-09 Thread GitBox


chia7712 commented on a change in pull request #9204:
URL: https://github.com/apache/kafka/pull/9204#discussion_r520297478



##
File path: core/src/main/scala/kafka/tools/DumpLogSegments.scala
##
@@ -257,8 +257,11 @@ object DumpLogSegments {
 }
 lastOffset = record.offset
 
-print(s"$RecordIndent offset: ${record.offset} 
${batch.timestampType}: ${record.timestamp} " +
-  s"keysize: ${record.keySize} valuesize: ${record.valueSize}")
+print(s"$RecordIndent offset: ${record.offset} baseOffset: 
${batch.baseOffset} lastOffset: ${batch.lastOffset} baseSequence: ${} 
${batch.baseSequence}" +
+ s" lastSequence: ${batch.lastSequence} producerEpoch: 
${batch.producerEpoch} partitionLeaderEpoch: ${batch.partitionLeaderEpoch} 
position: ${validBytes} " +
+ s" ${batch.timestampType}: ${record.timestamp} isvalid: 
${record.isValid}"+
+ s" keysize: ${record.keySize} valuesize: ${record.valueSize} 
size: ${batch.sizeInBytes} magic: ${batch.magic} " +

Review comment:
   isvalid -> "isValid"
   keysize -> "keySize"
   valuesize -> "valueSize"
   compresscodec -> "compressionType"





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-11-09 Thread GitBox


chia7712 commented on a change in pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#discussion_r520293926



##
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##
@@ -105,7 +105,9 @@ public AbstractConfig(ConfigDef definition, Map 
originals,  Map
 throw new ConfigException(entry.getKey().toString(), 
entry.getValue(), "Key must be a string.");
 
 this.originals = resolveConfigVariables(configProviderProps, 
(Map) originals);
-this.values = definition.parse(this.originals);
+// pass a copy to definition.parse. Otherwise, the definition.parse 
adds all keys of definitions to "used" group
+// since definition.parse needs to call "RecordingMap#get" when 
checking all definitions.
+this.values = definition.parse(new HashMap<>(this.originals));

Review comment:
   > I tried running console-producer with/without this PR. It doesn't seem 
to WARN any unused SSL configs in either test. Do you know why?
   
   @junrao this is the root cause.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7908) retention.ms and message.timestamp.difference.max.ms are tied

2020-11-09 Thread nandini (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228975#comment-17228975
 ] 

nandini commented on KAFKA-7908:


The bug relates to - https://issues.apache.org/jira/browse/KAFKA-4340

> retention.ms and message.timestamp.difference.max.ms are tied
> -
>
> Key: KAFKA-7908
> URL: https://issues.apache.org/jira/browse/KAFKA-7908
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Ciprian Pascu
>Priority: Minor
> Fix For: 2.3.0, 2.4.0
>
>
> When configuring retention.ms for a topic, following warning will be printed:
> _retention.ms for topic X is set to 180. It is smaller than 
> message.timestamp.difference.max.ms's value 9223372036854775807. This may 
> result in frequent log rolling. (kafka.log.Log)_
>  
> message.timestamp.difference.max.ms has not been configured explicitly, so it 
> has the default value of 9223372036854775807; I haven't seen anywhere 
> mentioned that this parameter needs to be configured also, if retention.ms is 
> configured; also, if we look at the default values for these parameters, they 
> are also so, that retention.ms < message.timestamp.difference.max.ms; so, 
> what is the purpose of this warning, in this case?
> The warning is generated from this code 
> (core/src/main/scala/kafka/log/Log.scala):
>   _def updateConfig(updatedKeys: Set[String], newConfig: LogConfig): Unit = {_
>     _*if ((updatedKeys.contains(LogConfig.RetentionMsProp)*_
>   *_|| 
> updatedKeys.contains(LogConfig.MessageTimestampDifferenceMaxMsProp))_*
>   _&& topicPartition.partition == 0  // generate warnings only for one 
> partition of each topic_
>   _&& newConfig.retentionMs < newConfig.messageTimestampDifferenceMaxMs)_
>   _warn(s"${LogConfig.RetentionMsProp} for topic ${topicPartition.topic} 
> is set to ${newConfig.retentionMs}. It is smaller than " +_
>     _s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value 
> ${newConfig.messageTimestampDifferenceMaxMs}. " +_
>     _s"This may result in frequent log rolling.")_
>     _this.config = newConfig_
>   _}_
>  
> Shouldn't the || operand in the bolded condition be replaced with &&?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda commented on pull request #9558: KAFKA-10342: migrate remaining RPCs to forwarding

2020-11-09 Thread GitBox


abbccdda commented on pull request #9558:
URL: https://github.com/apache/kafka/pull/9558#issuecomment-724447477


   @mumrah for a review



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman opened a new pull request #9582: KAFKA-6687: rewrite topology to allow creating multiple KStreams from the same topic

2020-11-09 Thread GitBox


ableegoldman opened a new pull request #9582:
URL: https://github.com/apache/kafka/pull/9582


   Needed to fix this on the side in order to more easily set up some 
experiments, so here's the PR.
   
   Allows a user to create multiple KStreams from the same topic, collection of 
topics, or pattern. The one exception is when the KStreams are subscribed to an 
overlapping-but-unequal collection of topics, which I left as future work (with 
a TODO in the comments describing a possible solution).
   
   If the offset reset policy doesn't match we just throw a TopologyException.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9751) Auto topic creation should go to controller

2020-11-09 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-9751:
---
Summary: Auto topic creation should go to controller  (was: Internal topic 
creation should go to controller)

> Auto topic creation should go to controller
> ---
>
> Key: KAFKA-9751
> URL: https://issues.apache.org/jira/browse/KAFKA-9751
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> For use cases to create internal topics through FindCoordinator or Metadata 
> request, receiving broker should route the topic creation request to the 
> controller instead of handling by itself.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10346) Propagate topic creation policy violation to the clients

2020-11-09 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10346?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen updated KAFKA-10346:

Description: At the moment, topic creation policy is not enforced on auto 
topic creation path for Metadata/FindCoordinator. After the topic creation 
fully goes to adminManager instead of zkManager, we may actually have the 
policy violation for those auto create topics, and inform client side the 
situation by bumping both RPCs version to include POLICY_VIOLATION error.  
(was: In the bridge release broker, the CreatePartition should be redirected to 
the active controller instead of relying on admin client discovery.)
Summary: Propagate topic creation policy violation to the clients  
(was: Redirect CreatePartition to the controller)

> Propagate topic creation policy violation to the clients
> 
>
> Key: KAFKA-10346
> URL: https://issues.apache.org/jira/browse/KAFKA-10346
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Priority: Major
>
> At the moment, topic creation policy is not enforced on auto topic creation 
> path for Metadata/FindCoordinator. After the topic creation fully goes to 
> adminManager instead of zkManager, we may actually have the policy violation 
> for those auto create topics, and inform client side the situation by bumping 
> both RPCs version to include POLICY_VIOLATION error.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vamossagar12 commented on pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores

2020-11-09 Thread GitBox


vamossagar12 commented on pull request #9508:
URL: https://github.com/apache/kafka/pull/9508#issuecomment-724420818


   @cadonna , The failing tests here 
https://github.com/apache/kafka/pull/9508/checks?check_run_id=1376108664 don't 
seem to be related to this PR. Would it be possible to retest?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-11-09 Thread GitBox


chia7712 commented on a change in pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#discussion_r520237060



##
File path: 
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##
@@ -159,24 +159,25 @@ private static ChannelBuilder create(SecurityProtocol 
securityProtocol,
 }
 
 // Visibility for testing
-protected static Map channelBuilderConfigs(final 
AbstractConfig config, final ListenerName listenerName) {
-Map parsedConfigs;
+@SuppressWarnings("unchecked")
+static Map channelBuilderConfigs(final AbstractConfig 
config, final ListenerName listenerName) {
+Map parsedConfigs;
 if (listenerName == null)
-parsedConfigs = config.values();
+parsedConfigs = (Map) config.values();

Review comment:
   ```java
   if (listenerName == null)
   parsedConfigs = (Map) config.values();
   else
   parsedConfigs = 
config.valuesWithPrefixOverride(listenerName.configPrefix());
   ```
   
   the method ```config.valuesWithPrefixOverride``` also returns 
```RecordingMap so it is ok.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #9489: MINOR: demote "Committing task offsets" log to DEBUG

2020-11-09 Thread GitBox


ableegoldman commented on pull request #9489:
URL: https://github.com/apache/kafka/pull/9489#issuecomment-724391259


   That's fair. Ok I'll go forward with demoting this and just add a new 
INFO-level log that's less frequent, maybe at the max poll interval



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8826: KAFKA-10090 Misleading warnings: The configuration was supplied but i…

2020-11-09 Thread GitBox


junrao commented on a change in pull request #8826:
URL: https://github.com/apache/kafka/pull/8826#discussion_r520225611



##
File path: 
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##
@@ -159,24 +159,25 @@ private static ChannelBuilder create(SecurityProtocol 
securityProtocol,
 }
 
 // Visibility for testing
-protected static Map channelBuilderConfigs(final 
AbstractConfig config, final ListenerName listenerName) {
-Map parsedConfigs;
+@SuppressWarnings("unchecked")
+static Map channelBuilderConfigs(final AbstractConfig 
config, final ListenerName listenerName) {
+Map parsedConfigs;
 if (listenerName == null)
-parsedConfigs = config.values();
+parsedConfigs = (Map) config.values();

Review comment:
   Does this cover the case when listenerName is not null? I guess that can 
only happen on the server side and since we don't log unused configs on the 
server, so maybe this is ok for now?

##
File path: 
clients/src/main/java/org/apache/kafka/common/network/ChannelBuilders.java
##
@@ -159,24 +159,25 @@ private static ChannelBuilder create(SecurityProtocol 
securityProtocol,
 }
 
 // Visibility for testing
-protected static Map channelBuilderConfigs(final 
AbstractConfig config, final ListenerName listenerName) {
-Map parsedConfigs;
+@SuppressWarnings("unchecked")
+static Map channelBuilderConfigs(final AbstractConfig 
config, final ListenerName listenerName) {
+Map parsedConfigs;
 if (listenerName == null)
-parsedConfigs = config.values();
+parsedConfigs = (Map) config.values();
 else
 parsedConfigs = 
config.valuesWithPrefixOverride(listenerName.configPrefix());
 
-// include any custom configs from original configs
-Map configs = new HashMap<>(parsedConfigs);
 config.originals().entrySet().stream()
 .filter(e -> !parsedConfigs.containsKey(e.getKey())) // exclude 
already parsed configs
 // exclude already parsed listener prefix configs
 .filter(e -> !(listenerName != null && 
e.getKey().startsWith(listenerName.configPrefix()) &&
 
parsedConfigs.containsKey(e.getKey().substring(listenerName.configPrefix().length()
 // exclude keys like `{mechanism}.some.prop` if "listener.name." 
prefix is present and key `some.prop` exists in parsed configs.
 .filter(e -> !(listenerName != null && 
parsedConfigs.containsKey(e.getKey().substring(e.getKey().indexOf('.') + 1
-.forEach(e -> configs.put(e.getKey(), e.getValue()));
-return configs;
+.forEach(e -> parsedConfigs.put(e.getKey(), e.getValue()));
+// The callers may add new elements to return map so we should not 
wrap it to a immutable map. Otherwise,
+// the callers have to create a new map to carry more elements and 
then following Get ops are not recorded.

Review comment:
   (1) "so we should not wrap it to a immutable map": It's kind of weird to 
have a comment on what we don't do.
   (2) to return map  => to returned map 
   

##
File path: 
clients/src/main/java/org/apache/kafka/common/config/AbstractConfig.java
##
@@ -582,6 +582,13 @@ public int hashCode() {
 return originals.hashCode();
 }
 
+/**
+ * @return true if the input map is a recording map. otherwise, false
+ */
+public static boolean isRecording(Map map) {

Review comment:
   common/config/* is part of the public interface. This method seems 
internal. So, could we not expose it publicly to the end user?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10575) StateRestoreListener#onRestoreEnd should always be triggered

2020-11-09 Thread Richard Yu (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10575?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228890#comment-17228890
 ] 

Richard Yu commented on KAFKA-10575:


[~guozhang] I'm interested in picking this one up. May I try my hand at it?

> StateRestoreListener#onRestoreEnd should always be triggered
> 
>
> Key: KAFKA-10575
> URL: https://issues.apache.org/jira/browse/KAFKA-10575
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Priority: Major
>
> Today we only trigger `StateRestoreListener#onRestoreEnd` when we complete 
> the restoration of an active task and transit it to the running state. 
> However the restoration can also be stopped when the restoring task gets 
> closed (because it gets migrated to another client, for example). We should 
> also trigger the callback indicating its progress when the restoration 
> stopped in any scenarios.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] scanterog commented on pull request #9545: [mm2] Allow Checkpoints for consumers using static partition assignments

2020-11-09 Thread GitBox


scanterog commented on pull request #9545:
URL: https://github.com/apache/kafka/pull/9545#issuecomment-724379519


   > Groups created on the target cluster by KAFKA-9076 are "simple groups" as 
there's no member information.
   > Not entirely sure why these were explicitly filtered. I can't immediately 
come up with a reason.
   > 
   > @scanterog Can you add a test as well?
   
   Sure. Do we want a test for `findConsumerGroups` or what are you looking to 
test?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10703) Document that default configs are not supported for TOPIC entities

2020-11-09 Thread Colin McCabe (Jira)
Colin McCabe created KAFKA-10703:


 Summary: Document that default configs are not supported for TOPIC 
entities
 Key: KAFKA-10703
 URL: https://issues.apache.org/jira/browse/KAFKA-10703
 Project: Kafka
  Issue Type: Improvement
Reporter: Colin McCabe


We should better document that default configs are not supported for TOPIC 
entities.  Currently an attempt to set them gets confusing error messages.

Using admin client's incrementalAlterConfigs with {type=TOPIC, name=""} gives a 
cryptic error stack trace:

Exception in thread "main" java.util.concurrent.ExecutionException: 
org.apache.kafka.common.errors.InvalidRequestException: Invalid config value 
for resource ConfigResource(type=TOPIC, name=''): Path must not end with / 
character
at 
org.apache.kafka.common.internals.KafkaFutureImpl.wrapAndThrow(KafkaFutureImpl.java:45)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.access$000(KafkaFutureImpl.java:32)
at 
org.apache.kafka.common.internals.KafkaFutureImpl$SingleWaiter.await(KafkaFutureImpl.java:89)
at 
org.apache.kafka.common.internals.KafkaFutureImpl.get(KafkaFutureImpl.java:260)
at kafka.admin.ConfigCommand$.main(ConfigCommand.scala:91)
at kafka.admin.ConfigCommand.main(ConfigCommand.scala)
Caused by: org.apache.kafka.common.errors.InvalidRequestException: Invalid 
config value for resource ConfigResource(type=TOPIC, name=''): Path must not 
end with / character

Similarly, kafka-configs.sh is not very clear about this issue.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] wcarlson5 commented on pull request #9581: KAFKA-10500: Add thread

2020-11-09 Thread GitBox


wcarlson5 commented on pull request #9581:
URL: https://github.com/apache/kafka/pull/9581#issuecomment-724347812


   https://github.com/apache/kafka/pull/9572 need to get this merged first



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 opened a new pull request #9581: KAFKA-10500: Add thread

2020-11-09 Thread GitBox


wcarlson5 opened a new pull request #9581:
URL: https://github.com/apache/kafka/pull/9581


   Can add stream threads now
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10688) Handle accidental truncation of repartition topics as exceptional failure

2020-11-09 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228844#comment-17228844
 ] 

Guozhang Wang commented on KAFKA-10688:
---

Normally the repartition topic should never have an invalid offset after 
setting it initially, since the repartition topic's retention should be 
infinity and we only truncate it via the delete-records; this ticket is for 
guarding against abnormal cases e.g. if users accidentally truncated the 
repartition topics.

> Handle accidental truncation of repartition topics as exceptional failure
> -
>
> Key: KAFKA-10688
> URL: https://issues.apache.org/jira/browse/KAFKA-10688
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> Today we always handle InvalidOffsetException from the main consumer by the 
> resetting policy assuming they are for source topics. But repartition topics 
> are also source topics and should never be truncated and hence cause 
> InvalidOffsetException.
> We should differentiate these repartition topics from external source topics 
> and treat the InvalidOffsetException from repartition topics as fatal and 
> close the whole application.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #9224: KAFKA-10304: refactor MM2 integration tests

2020-11-09 Thread GitBox


mimaison commented on pull request #9224:
URL: https://github.com/apache/kafka/pull/9224#issuecomment-724323408


   @ning2008wisc I've not forgotten this PR, I just haven't had time to do 
reviews yet :( 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #9545: [mm2] Allow Checkpoints for consumers using static partition assignments

2020-11-09 Thread GitBox


mimaison commented on pull request #9545:
URL: https://github.com/apache/kafka/pull/9545#issuecomment-724322506


   Groups created on the target cluster by KAFKA-9076 are "simple groups" as 
there's no member information.
   Not entirely sure why these were explicitly filtered. I can't immediately 
come up with a reason.
   
   @scanterog Can you add a test as well?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10661) Add resigned state to raft state machine to preserve leader/epoch information

2020-11-09 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10661?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-10661.
-
Resolution: Fixed

> Add resigned state to raft state machine to preserve leader/epoch information
> -
>
> Key: KAFKA-10661
> URL: https://issues.apache.org/jira/browse/KAFKA-10661
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>
> While working on KAFKA-10655, I realized we have a bug in the existing raft 
> state initialization logic when the process shuts down as leader. After 
> reinitializing, we retain the current epoch, but we discard the current 
> leader status. This means that it is possible for the node to vote for 
> another node in the same epoch that it was the leader of.
> To fix this problem I think we should add a separate "resigned" state. When 
> re-initializing after being shutdown as leader, we can enter the "resigned" 
> state. This prevents us from voting for another candidate while still 
> ensuring that a new election needs to be held.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

2020-11-09 Thread GitBox


hachikuji merged pull request #9531:
URL: https://github.com/apache/kafka/pull/9531


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda opened a new pull request #9580: KAFKA-10350: add forwarding manager implementation with metrics

2020-11-09 Thread GitBox


abbccdda opened a new pull request #9580:
URL: https://github.com/apache/kafka/pull/9580


   Add metric for forwarding request tracking. Note that the implementation is 
slightly diverged from the KIP, where we decide to get rid of the client.id tag 
since most admin clients would only have one inflight forwarding request.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-09 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r520104884



##
File path: streams/src/main/resources/common/message/SubscriptionInfoData.json
##
@@ -57,6 +57,11 @@
   "name": "uniqueField",
   "versions": "8+",
   "type": "int8"
+},
+{
+  "name": "shutdownRequested",
+  "versions": "9+",
+  "type": "int8"

Review comment:
   https://github.com/apache/kafka/pull/9273#discussion_r486597512
   I originally had it at int32, but @vvcephei suggested int16, now it is int8.
   
   would you be good with int16 or do you think int32 is the way?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-09 Thread GitBox


wcarlson5 commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r520104884



##
File path: streams/src/main/resources/common/message/SubscriptionInfoData.json
##
@@ -57,6 +57,11 @@
   "name": "uniqueField",
   "versions": "8+",
   "type": "int8"
+},
+{
+  "name": "shutdownRequested",
+  "versions": "9+",
+  "type": "int8"

Review comment:
   https://github.com/apache/kafka/pull/9273#discussion_r486597512
   I originally had it at int32, but john suggested int16, now it is int8.
   
   would you be good with int16 or do you think int32 is the way?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

2020-11-09 Thread GitBox


C0urante commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r520056667



##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom> implements 
Transformation {
+
+public static final String FIELDS_FIELD = "fields";
+public static final String HEADERS_FIELD = "headers";
+public static final String OPERATION_FIELD = "operation";
+
+public static final String OVERVIEW_DOC =
+"Moves or copies fields in the key/value of a record into that 
record's headers. " +
+"Corresponding elements of " + FIELDS_FIELD + 
" and " +
+"" + HEADERS_FIELD + " together identify a 
field and the header it should be " +
+"moved or copied to. " +
+"Use the concrete transformation type designed for the 
record " +
+"key (" + Key.class.getName() + ") or value 
(" + Value.class.getName() + ").";
+
+public static final ConfigDef CONFIG_DEF = new ConfigDef()
+.define(FIELDS_FIELD, ConfigDef.Type.LIST, 
ConfigDef.Importance.HIGH,
+"Field names in the record whose values are to be copied 
or moved to headers.")
+.define(HEADERS_FIELD, ConfigDef.Type.LIST, 
ConfigDef.Importance.HIGH,
+"Header names, in the same order as the field names listed 
in the fields configuration property.")
+.define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+ConfigDef.ValidString.in("move", "copy"), 
ConfigDef.Importance.HIGH,
+"Either move if the fields are to be moved to 
the headers (removed from the key/value), " +
+"or copy if the fields are to be 
copied to the headers (retained in the key/value).");
+
+enum Operation {
+MOVE("move"),
+COPY("copy");
+
+private final String name;
+
+Operation(String name) {
+this.name = name;
+}
+
+static Operation fromName(String name) {
+switch (name) {
+case "move":
+return MOVE;
+case "copy":
+return COPY;
+default:
+throw new IllegalArgumentException();
+}
+}
+
+public String toString() {
+return name;
+}
+}
+
+private List fields;
+
+private List headers;
+
+private Operation operation;
+
+@Override
+public R apply(R record) {
+Object operatingValue = operatingValue(record);
+Schema operatingSchema = operatingSchema(record);
+
+if (operatingSchema == null) {
+return applySchemaless(record, operatingValue);
+} else {
+return applyWithSchema(record, operatingValue, operatingSchema);
+}
+}
+
+private R applyWithSchema(R record, Object operatingValue, Schema 
operatingSchema) {
+Headers updatedHeaders = record.headers().duplicate();

Review comment:
   Why duplicate headers here? According to the [Header class's 
Javadocs](https://kafka.apache.org/26/javadoc/org/apache/kafka/connect/header/Headers.html),
 the 

[GitHub] [kafka] ableegoldman commented on a change in pull request #9487: KAFKA-9331: Add a streams specific uncaught exception handler

2020-11-09 Thread GitBox


ableegoldman commented on a change in pull request #9487:
URL: https://github.com/apache/kafka/pull/9487#discussion_r520077048



##
File path: streams/src/main/resources/common/message/SubscriptionInfoData.json
##
@@ -57,6 +57,11 @@
   "name": "uniqueField",
   "versions": "8+",
   "type": "int8"
+},
+{
+  "name": "shutdownRequested",
+  "versions": "9+",
+  "type": "int8"

Review comment:
   I'm not really worried that we'd run out of space, I just think it sends 
a signal that the Assignment and Subscription error codes are semantically 
distinct and don't refer to the same underlying concept. So it seems better to 
go with the simpler approach than over-optimize to save an occasional three 
bytes 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

2020-11-09 Thread GitBox


C0urante commented on pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#issuecomment-724213528


   Thanks for reaching out @tombentley! Happy to take a look.
   
   For reference, maybe you could add a link to 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect#KIP145ExposeRecordHeadersinKafkaConnect-Transformations
 to the description?
   
   Will begin reviewing shortly.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10702) Slow replication of empty transactions

2020-11-09 Thread Jason Gustafson (Jira)
Jason Gustafson created KAFKA-10702:
---

 Summary: Slow replication of empty transactions
 Key: KAFKA-10702
 URL: https://issues.apache.org/jira/browse/KAFKA-10702
 Project: Kafka
  Issue Type: Bug
Reporter: Jason Gustafson
Assignee: Jason Gustafson


We hit a case in which we had to re-replicate a compacted topic from the 
beginning of the log. Some portions of the log consisted mostly of transaction 
markers, which were extremely slow to replicate. The problem is that 
`ProducerStateManager` adds all of these empty transactions to its internal 
collection of `ongoingTxns` before immediately removing them. There could be 
tens of thousands of empty transactions in the worst case from a single `Fetch` 
response, so this can create a huge amount of pressure on the broker. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

2020-11-09 Thread GitBox


guozhangwang commented on pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#issuecomment-724186172


   LGTM.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

2020-11-09 Thread GitBox


guozhangwang commented on a change in pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#discussion_r520020029



##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -21,18 +21,21 @@
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * This class is responsible for managing the current state of this node and 
ensuring only
- * valid state transitions.
+ * This class is responsible for managing the current state of this node and 
ensuring
+ * only valid state transitions. Below we define the possible state 
transitions and
+ * how they are triggered:
  *
- * Unattached =>
+ * Unattached|Resigned =>

Review comment:
   Okay now I remembered what we discussed before.
   
   What I was wondering is, say with quorum size 6, we would need 4 votes to 
elect leader; if the current leader shutdown and before it is restarted, the 
quorum size is 5 so logically we only need 3 votes --- but as long as we 
require that during this transition we still require 4 votes even with 5 alive 
quorum members we are fine.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9751) Internal topic creation should go to controller

2020-11-09 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9751?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-9751:
--

Assignee: Boyang Chen

> Internal topic creation should go to controller
> ---
>
> Key: KAFKA-9751
> URL: https://issues.apache.org/jira/browse/KAFKA-9751
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Boyang Chen
>Assignee: Boyang Chen
>Priority: Major
>
> For use cases to create internal topics through FindCoordinator or Metadata 
> request, receiving broker should route the topic creation request to the 
> controller instead of handling by itself.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] abbccdda opened a new pull request #9579: KAFKA-9751: Forward FindCoordinator request when topic creation is needed

2020-11-09 Thread GitBox


abbccdda opened a new pull request #9579:
URL: https://github.com/apache/kafka/pull/9579


   This PR forward the entire FindCoordinator request to the active controller 
when the internal topic being queried is not ready to be served yet.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10699) Add system test coverage for group coordinator emigration

2020-11-09 Thread Boyang Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10699?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Boyang Chen reassigned KAFKA-10699:
---

Assignee: feyman

> Add system test coverage for group coordinator emigration
> -
>
> Key: KAFKA-10699
> URL: https://issues.apache.org/jira/browse/KAFKA-10699
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Boyang Chen
>Assignee: feyman
>Priority: Major
>
> After merging the fix https://issues.apache.org/jira/browse/KAFKA-10284, we 
> believe that it is important to add system test coverage for the group 
> coordinator migration to verify consumer behaviors are correct.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-09 Thread GitBox


hachikuji commented on a change in pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#discussion_r519983517



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/EnvelopeRequest.java
##
@@ -91,4 +91,14 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
 public static EnvelopeRequest parse(ByteBuffer buffer, short version) {
 return new EnvelopeRequest(ApiKeys.ENVELOPE.parseRequest(version, 
buffer), version);
 }
+
+public EnvelopeRequestData data() {
+return data;
+}
+
+@Override
+public Send toSend(String destination, RequestHeader header) {

Review comment:
   #7409 might be a good opportunity to complete this. What do you think 
@ijuma ?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy edited a comment on pull request #9556: MINOR: Update jetty to 9.4.33

2020-11-09 Thread GitBox


omkreddy edited a comment on pull request #9556:
URL: https://github.com/apache/kafka/pull/9556#issuecomment-724131363


   Merging to trunk and older branches (2.7, 2.6, 2.5, 2.4)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-09 Thread GitBox


hachikuji commented on a change in pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#discussion_r519971686



##
File path: 
generator/src/main/java/org/apache/kafka/message/MessageDataGenerator.java
##
@@ -1579,58 +1566,58 @@ private void generateVariableLengthFieldSize(FieldSpec 
field,
 }
 if (tagged) {
 
headerGenerator.addImport(MessageGenerator.BYTE_UTILS_CLASS);
+buffer.printf("int _arraySize = _size.totalSize() - 
_sizeBeforeArray;%n");
 buffer.printf("_cache.setArraySizeInBytes(%s, 
_arraySize);%n",
 field.camelCaseName());
-buffer.printf("_size += _arraySize + 
ByteUtils.sizeOfUnsignedVarint(_arraySize);%n");
-} else {
-buffer.printf("_size += _arraySize;%n");
+
buffer.printf("_size.addBytes(ByteUtils.sizeOfUnsignedVarint(_arraySize));%n");
 }
 } else if (field.type().isBytes()) {
+buffer.printf("int _sizeBeforeBytes = 
_size.totalSize();%n");

Review comment:
   Good catch





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores

2020-11-09 Thread GitBox


vamossagar12 commented on pull request #9508:
URL: https://github.com/apache/kafka/pull/9508#issuecomment-724142512


   > @vamossagar12, Thank you for the PR!
   > 
   > Here my feedback!
   
   @cadonna thanks i made the requeiste changes barring 1. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9508: KAFKA-10648: Add Prefix Scan support to State Stores

2020-11-09 Thread GitBox


vamossagar12 commented on a change in pull request #9508:
URL: https://github.com/apache/kafka/pull/9508#discussion_r519969912



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBPrefixIterator.java
##
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.state.internals;
+
+import org.apache.kafka.common.utils.Bytes;
+import org.apache.kafka.streams.KeyValue;
+import org.apache.kafka.streams.state.KeyValueIterator;
+import org.rocksdb.RocksIterator;
+
+import java.nio.ByteBuffer;
+import java.util.Set;
+
+class RocksDBPrefixIterator extends RocksDbIterator {

Review comment:
   This one, do I need explicit unit tests? I noticed that something like 
RocksDbRangeIterator also doesn't have unit tests.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

2020-11-09 Thread GitBox


hachikuji commented on a change in pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#discussion_r519969208



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1601,7 +1618,12 @@ private long pollFollower(long currentTimeMs) throws 
IOException {
 }
 
 private long pollFollowerAsVoter(FollowerState state, long currentTimeMs) 
throws IOException {
-if (state.hasFetchTimeoutExpired(currentTimeMs)) {
+GracefulShutdown shutdown = this.shutdown.get();
+if (shutdown != null) {
+// If we are a follower, then we can shutdown immediately. We want 
to
+// skip the transition to candidate in any case.
+return 0;

Review comment:
   If we are a follower, then there is no election in progress to help 
with. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

2020-11-09 Thread GitBox


hachikuji commented on a change in pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#discussion_r519968145



##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -21,18 +21,21 @@
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * This class is responsible for managing the current state of this node and 
ensuring only
- * valid state transitions.
+ * This class is responsible for managing the current state of this node and 
ensuring
+ * only valid state transitions. Below we define the possible state 
transitions and
+ * how they are triggered:
  *
- * Unattached =>
+ * Unattached|Resigned =>

Review comment:
   In the future, when we have reassignment, we will still have to protect 
every quorum change with a majority of the current nodes. The proposal we had 
previously only allowed single-node changes, which meant that any majority was 
a majority before and after the state was applied.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] niteshmor commented on pull request #9556: MINOR: Update jetty to 9.4.33

2020-11-09 Thread GitBox


niteshmor commented on pull request #9556:
URL: https://github.com/apache/kafka/pull/9556#issuecomment-724139651


   Thanks @omkreddy 
   For older releases, feel free to use the commits noted in 
https://github.com/apache/kafka/pull/9556#issuecomment-721898016 as a 
reference. Git cherry-picking didn't work for me, and older versions required 
jersey upgrade anyways. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

2020-11-09 Thread GitBox


hachikuji commented on a change in pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#discussion_r519965008



##
File path: raft/src/main/java/org/apache/kafka/raft/QuorumState.java
##
@@ -21,18 +21,21 @@
 import org.slf4j.Logger;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Optional;
 import java.util.OptionalInt;
 import java.util.Random;
 import java.util.Set;
 import java.util.stream.Collectors;
 
 /**
- * This class is responsible for managing the current state of this node and 
ensuring only
- * valid state transitions.
+ * This class is responsible for managing the current state of this node and 
ensuring
+ * only valid state transitions. Below we define the possible state 
transitions and
+ * how they are triggered:
  *
- * Unattached =>
+ * Unattached|Resigned =>

Review comment:
   Hmm.. The quorum size does not change because a leader resigns. If there 
are 2N nodes in the cluster, then we always need N + 1 votes, so I don't think 
this case is possible.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy closed pull request #9556: MINOR: Update jetty to 9.4.33

2020-11-09 Thread GitBox


omkreddy closed pull request #9556:
URL: https://github.com/apache/kafka/pull/9556


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9563: KAFKA-10684; Avoid additional envelope copies during network transmission

2020-11-09 Thread GitBox


hachikuji commented on a change in pull request #9563:
URL: https://github.com/apache/kafka/pull/9563#discussion_r519955199



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/EnvelopeRequest.java
##
@@ -91,4 +91,14 @@ public AbstractResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
 public static EnvelopeRequest parse(ByteBuffer buffer, short version) {
 return new EnvelopeRequest(ApiKeys.ENVELOPE.parseRequest(version, 
buffer), version);
 }
+
+public EnvelopeRequestData data() {
+return data;
+}
+
+@Override
+public Send toSend(String destination, RequestHeader header) {

Review comment:
   Yeah, I think so. And looks like we're almost there. After your patch 
for `Produce`, the only remaining unconverted API that I see is 
`OffsetsForLeaderEpoch`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #9556: MINOR: Update jetty to 9.4.33

2020-11-09 Thread GitBox


omkreddy commented on pull request #9556:
URL: https://github.com/apache/kafka/pull/9556#issuecomment-724131363


   Merging to trunk and older branches.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9573: KAFKA-10693: Close quota managers created in tests

2020-11-09 Thread GitBox


splett2 commented on a change in pull request #9573:
URL: https://github.com/apache/kafka/pull/9573#discussion_r519894386



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -1894,10 +1894,10 @@ class ReplicaManagerTest {
 
 // each replica manager is for a broker
 val rm0 = new ReplicaManager(config0, metrics, time, kafkaZkClient, new 
MockScheduler(time), mockLogMgr0,
-  new AtomicBoolean(false), QuotaFactory.instantiate(config0, metrics, 
time, ""),
+  new AtomicBoolean(false), quotaManager,
   brokerTopicStats1, metadataCache0, new 
LogDirFailureChannel(config0.logDirs.size), alterIsrManager)
 val rm1 = new ReplicaManager(config1, metrics, time, kafkaZkClient, new 
MockScheduler(time), mockLogMgr1,
-  new AtomicBoolean(false), QuotaFactory.instantiate(config1, metrics, 
time, ""),
+  new AtomicBoolean(false), quotaManager,

Review comment:
   the two replica managers are intended to be separate. I used the same 
quota manager for both to avoid polluting the method signature/diff. The tests 
using this utility function don't rely on any quota manager behavior, so I felt 
this was okay.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9573: KAFKA-10693: Close quota managers created in tests

2020-11-09 Thread GitBox


splett2 commented on a change in pull request #9573:
URL: https://github.com/apache/kafka/pull/9573#discussion_r519894386



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -1894,10 +1894,10 @@ class ReplicaManagerTest {
 
 // each replica manager is for a broker
 val rm0 = new ReplicaManager(config0, metrics, time, kafkaZkClient, new 
MockScheduler(time), mockLogMgr0,
-  new AtomicBoolean(false), QuotaFactory.instantiate(config0, metrics, 
time, ""),
+  new AtomicBoolean(false), quotaManager,
   brokerTopicStats1, metadataCache0, new 
LogDirFailureChannel(config0.logDirs.size), alterIsrManager)
 val rm1 = new ReplicaManager(config1, metrics, time, kafkaZkClient, new 
MockScheduler(time), mockLogMgr1,
-  new AtomicBoolean(false), QuotaFactory.instantiate(config1, metrics, 
time, ""),
+  new AtomicBoolean(false), quotaManager,

Review comment:
   the two replica managers are intended to be separate. I used the same 
quota manager for both to avoid polluting the method signature/diff. The tests 
using this utility function don't rely on the quota managers for testing, so I 
felt this was okay.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] stanislavkozlovski commented on pull request #7498: KAFKA-9023: Log request destination when the Producer gets disconnected

2020-11-09 Thread GitBox


stanislavkozlovski commented on pull request #7498:
URL: https://github.com/apache/kafka/pull/7498#issuecomment-724117161


   >  guess this is a nitpick, but I'd rather not construct an error message 
when I might or might not need it (and won't in the common, error-free case.)
   
   @cmccabe we construct it only when the error message is not null and not 
empty
   
   cc @dajac could you help review this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9573: KAFKA-10693: Close quota managers created in tests

2020-11-09 Thread GitBox


dajac commented on a change in pull request #9573:
URL: https://github.com/apache/kafka/pull/9573#discussion_r519922038



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -1506,7 +1508,7 @@ class ReplicaManagerTest {
   purgatoryName = "ElectLeader", timer, reaperEnabled = false)
 
 // Mock network client to show leader offset of 5
-val quota = QuotaFactory.instantiate(config, metrics, time, "")
+val quota = quotaManager

Review comment:
   Could we rename `quotaManager` used in `createReplicaFetcherManager` to 
`replicationQuotaManager` and use `quotaManager` as the global one everywhere?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9573: KAFKA-10693: Close quota managers created in tests

2020-11-09 Thread GitBox


splett2 commented on a change in pull request #9573:
URL: https://github.com/apache/kafka/pull/9573#discussion_r519894386



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -1894,10 +1894,10 @@ class ReplicaManagerTest {
 
 // each replica manager is for a broker
 val rm0 = new ReplicaManager(config0, metrics, time, kafkaZkClient, new 
MockScheduler(time), mockLogMgr0,
-  new AtomicBoolean(false), QuotaFactory.instantiate(config0, metrics, 
time, ""),
+  new AtomicBoolean(false), quotaManager,
   brokerTopicStats1, metadataCache0, new 
LogDirFailureChannel(config0.logDirs.size), alterIsrManager)
 val rm1 = new ReplicaManager(config1, metrics, time, kafkaZkClient, new 
MockScheduler(time), mockLogMgr1,
-  new AtomicBoolean(false), QuotaFactory.instantiate(config1, metrics, 
time, ""),
+  new AtomicBoolean(false), quotaManager,

Review comment:
   in this case, the two replica managers are meant to be separate. I used 
the same quota manager for both to avoid polluting the response. The test 
doesn't rely on the quota managers for functionality, so I felt this was okay.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9573: KAFKA-10693: Close quota managers created in tests

2020-11-09 Thread GitBox


splett2 commented on a change in pull request #9573:
URL: https://github.com/apache/kafka/pull/9573#discussion_r519894386



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -1894,10 +1894,10 @@ class ReplicaManagerTest {
 
 // each replica manager is for a broker
 val rm0 = new ReplicaManager(config0, metrics, time, kafkaZkClient, new 
MockScheduler(time), mockLogMgr0,
-  new AtomicBoolean(false), QuotaFactory.instantiate(config0, metrics, 
time, ""),
+  new AtomicBoolean(false), quotaManager,
   brokerTopicStats1, metadataCache0, new 
LogDirFailureChannel(config0.logDirs.size), alterIsrManager)
 val rm1 = new ReplicaManager(config1, metrics, time, kafkaZkClient, new 
MockScheduler(time), mockLogMgr1,
-  new AtomicBoolean(false), QuotaFactory.instantiate(config1, metrics, 
time, ""),
+  new AtomicBoolean(false), quotaManager,

Review comment:
   the two replica managers are intended to be separate. I used the same 
quota manager for both to avoid polluting the method signature/diff. The test 
doesn't rely on the quota managers for functionality, so I felt this was okay.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()

2020-11-09 Thread GitBox


tombentley commented on pull request #9433:
URL: https://github.com/apache/kafka/pull/9433#issuecomment-724077206


   @chia7712 is there anything more you needed on this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9573: KAFKA-10693: Close quota managers created in tests

2020-11-09 Thread GitBox


splett2 commented on a change in pull request #9573:
URL: https://github.com/apache/kafka/pull/9573#discussion_r519873954



##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -1506,7 +1508,7 @@ class ReplicaManagerTest {
   purgatoryName = "ElectLeader", timer, reaperEnabled = false)
 
 // Mock network client to show leader offset of 5
-val quota = QuotaFactory.instantiate(config, metrics, time, "")
+val quota = quotaManager

Review comment:
   this is needed to avoid method variable name collision with the 
`ReplicationQuotaManager` in the overriden `createReplicaFetcherManager` a 
little bit further down in the test.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-10701) First line of detailed stats from consumer-perf-test.sh incorrect

2020-11-09 Thread David Arthur (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10701?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Arthur updated KAFKA-10701:
-
Description: 
When running the console perf test with {{--show-detailed-stats}}, the first 
line out of output has incorrect results

{code}
$ ./bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic 
test --messages 1000 --reporting-interval 1000 --show-detailed-stats
time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, 
rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2020-11-06 11:57:01:420, 0, 275.3878, 275.3878, 288765, 288765., 
1604681820723, -1604681819723, 0., 0.
2020-11-06 11:57:02:420, 0, 952.1456, 676.7578, 998397, 709632., 0, 1000, 
676.7578, 709632.
2020-11-06 11:57:03:420, 0, 1654.2940, 702.1484, 1734653, 736256., 0, 1000, 
702.1484, 736256.
2020-11-06 11:57:04:420, 0, 2492.1389, 837.8448, 2613197, 878544., 0, 1000, 
837.8448, 878544.
2020-11-06 11:57:05:420, 0, 3403.2993, 911.1605, 3568618, 955421., 0, 1000, 
911.1605, 955421.
2020-11-06 11:57:06:420, 0, 4204.1540, 800.8547, 4408375, 839757., 0, 1000, 
800.8547, 839757.
2020-11-06 11:57:07:420, 0, 4747.1275, 542.9735, 4977724, 569349., 0, 1000, 
542.9735, 569349.
2020-11-06 11:57:08:420, 0, 5282.2266, 535.0990, 5538816, 561092., 0, 1000, 
535.0990, 561092.
2020-11-06 11:57:09:420, 0, 5824.3732, 542.1467, 6107298, 568482., 0, 1000, 
542.1467, 568482.
{code}

This seems to be due to incorrect initialization of the {{joinStart}} variable 
in the consumer perf test code.

  was:
When running the console perf test with {{--show-detailed-stats}}, the first 
line out of output has incorrect results

{code}
$ ./bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic 
test --messages 1000 --reporting-interval 1000 --show-detailed-stats
time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, 
rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2020-11-06 11:57:01:420, 0, 275.3878, 275.3878, 288765, 288765., 
1604681820723, -1604681819723, 0., 0.
2020-11-06 11:57:02:420, 0, 952.1456, 676.7578, 998397, 709632., 0, 1000, 
676.7578, 709632.
2020-11-06 11:57:03:420, 0, 1654.2940, 702.1484, 1734653, 736256., 0, 1000, 
702.1484, 736256.
2020-11-06 11:57:04:420, 0, 2492.1389, 837.8448, 2613197, 878544., 0, 1000, 
837.8448, 878544.
2020-11-06 11:57:05:420, 0, 3403.2993, 911.1605, 3568618, 955421., 0, 1000, 
911.1605, 955421.
2020-11-06 11:57:06:420, 0, 4204.1540, 800.8547, 4408375, 839757., 0, 1000, 
800.8547, 839757.
2020-11-06 11:57:07:420, 0, 4747.1275, 542.9735, 4977724, 569349., 0, 1000, 
542.9735, 569349.
2020-11-06 11:57:08:420, 0, 5282.2266, 535.0990, 5538816, 561092., 0, 1000, 
535.0990, 561092.
2020-11-06 11:57:09:420, 0, 5824.3732, 542.1467, 6107298, 568482., 0, 1000, 
542.1467, 568482.
{code}

This seems to be due to incorrect initialization of the {joinStart} variable. 


> First line of detailed stats from consumer-perf-test.sh incorrect
> -
>
> Key: KAFKA-10701
> URL: https://issues.apache.org/jira/browse/KAFKA-10701
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Reporter: David Arthur
>Priority: Minor
>  Labels: newbie
>
> When running the console perf test with {{--show-detailed-stats}}, the first 
> line out of output has incorrect results
> {code}
> $ ./bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic 
> test --messages 1000 --reporting-interval 1000 --show-detailed-stats
> time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, 
> rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
> 2020-11-06 11:57:01:420, 0, 275.3878, 275.3878, 288765, 288765., 
> 1604681820723, -1604681819723, 0., 0.
> 2020-11-06 11:57:02:420, 0, 952.1456, 676.7578, 998397, 709632., 0, 1000, 
> 676.7578, 709632.
> 2020-11-06 11:57:03:420, 0, 1654.2940, 702.1484, 1734653, 736256., 0, 
> 1000, 702.1484, 736256.
> 2020-11-06 11:57:04:420, 0, 2492.1389, 837.8448, 2613197, 878544., 0, 
> 1000, 837.8448, 878544.
> 2020-11-06 11:57:05:420, 0, 3403.2993, 911.1605, 3568618, 955421., 0, 
> 1000, 911.1605, 955421.
> 2020-11-06 11:57:06:420, 0, 4204.1540, 800.8547, 4408375, 839757., 0, 
> 1000, 800.8547, 839757.
> 2020-11-06 11:57:07:420, 0, 4747.1275, 542.9735, 4977724, 569349., 0, 
> 1000, 542.9735, 569349.
> 2020-11-06 11:57:08:420, 0, 5282.2266, 535.0990, 5538816, 561092., 0, 
> 1000, 535.0990, 561092.
> 2020-11-06 11:57:09:420, 0, 5824.3732, 542.1467, 6107298, 568482., 0, 
> 1000, 542.1467, 568482.
> {code}
> 

[jira] [Created] (KAFKA-10701) First line of detailed stats from consumer-perf-test.sh incorrect

2020-11-09 Thread David Arthur (Jira)
David Arthur created KAFKA-10701:


 Summary: First line of detailed stats from consumer-perf-test.sh 
incorrect
 Key: KAFKA-10701
 URL: https://issues.apache.org/jira/browse/KAFKA-10701
 Project: Kafka
  Issue Type: Bug
  Components: tools
Reporter: David Arthur


When running the console perf test with {{--show-detailed-stats}}, the first 
line out of output has incorrect results

{code}
$ ./bin/kafka-consumer-perf-test.sh --bootstrap-server localhost:9092 --topic 
test --messages 1000 --reporting-interval 1000 --show-detailed-stats
time, threadId, data.consumed.in.MB, MB.sec, data.consumed.in.nMsg, nMsg.sec, 
rebalance.time.ms, fetch.time.ms, fetch.MB.sec, fetch.nMsg.sec
2020-11-06 11:57:01:420, 0, 275.3878, 275.3878, 288765, 288765., 
1604681820723, -1604681819723, 0., 0.
2020-11-06 11:57:02:420, 0, 952.1456, 676.7578, 998397, 709632., 0, 1000, 
676.7578, 709632.
2020-11-06 11:57:03:420, 0, 1654.2940, 702.1484, 1734653, 736256., 0, 1000, 
702.1484, 736256.
2020-11-06 11:57:04:420, 0, 2492.1389, 837.8448, 2613197, 878544., 0, 1000, 
837.8448, 878544.
2020-11-06 11:57:05:420, 0, 3403.2993, 911.1605, 3568618, 955421., 0, 1000, 
911.1605, 955421.
2020-11-06 11:57:06:420, 0, 4204.1540, 800.8547, 4408375, 839757., 0, 1000, 
800.8547, 839757.
2020-11-06 11:57:07:420, 0, 4747.1275, 542.9735, 4977724, 569349., 0, 1000, 
542.9735, 569349.
2020-11-06 11:57:08:420, 0, 5282.2266, 535.0990, 5538816, 561092., 0, 1000, 
535.0990, 561092.
2020-11-06 11:57:09:420, 0, 5824.3732, 542.1467, 6107298, 568482., 0, 1000, 
542.1467, 568482.
{code}

This seems to be due to incorrect initialization of the {joinStart} variable. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram opened a new pull request #9578: MINOR: Log resource pattern of ACL updates at INFO level

2020-11-09 Thread GitBox


rajinisivaram opened a new pull request #9578:
URL: https://github.com/apache/kafka/pull/9578


   At the moment, we have one log entry for ACL updates that says:
   ```
   Processing notification(s) to /kafka-acl-changes
   ```
   For other updates like broker configuration updates, we have an additional 
entry at INFO level that shows what was updated when the change notification 
was processed. Since every resource pattern may have 100s or 1000s of access 
control entries associated with it, we don't want to log the entire contents on 
ACL update at INFO level. But it would be useful to log the resource pattern. 
This shows that we processed the notification in AclAuthorizer and gives the 
resource and version that was refreshed from ZK. This PR adds an additional 
INFO-level log entry for ACL updates in AclAuthorizer and retains the existing 
DEBUG level entry with full details.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9104: KAFKA-10266: Update the connector config header.converter

2020-11-09 Thread GitBox


showuon commented on pull request #9104:
URL: https://github.com/apache/kafka/pull/9104#issuecomment-723981295


   @kkonstantine , please help review this PR. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9507: KAFKA-10628: remove all the unnecessary parameters from the tests which are using TopologyTestDriver

2020-11-09 Thread GitBox


showuon commented on pull request #9507:
URL: https://github.com/apache/kafka/pull/9507#issuecomment-723974389


   @chia7712 @vvcephei , please help review this PR. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #9576: KAFKA-10685: strictly parsing the date/time format

2020-11-09 Thread GitBox


showuon commented on pull request #9576:
URL: https://github.com/apache/kafka/pull/9576#issuecomment-723973014


   @mikebin @manijndl7 @mjsax , please help review this PR. Thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10685) --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong

2020-11-09 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10685?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228542#comment-17228542
 ] 

Luke Chen commented on KAFKA-10685:
---

Thanks for the suggestion. It turns out that the 
*SimpleDateFormat.setLenient(false)* can also strictly parse the timestamp and 
throw parseException if milliseconds is more than 3 digits. Thanks.

> --to-datetime passed to kafka-consumer-groups interpreting microseconds wrong
> -
>
> Key: KAFKA-10685
> URL: https://issues.apache.org/jira/browse/KAFKA-10685
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.7.0
>Reporter: Russell Sayers
>Assignee: Luke Chen
>Priority: Minor
>
> If you pass more than 3 decimal places for the fractional seconds of the 
> datetime, the microseconds get interpreted as milliseconds.
> {{kafka-consumer-groups --bootstrap-server kafka:9092 }}
> {{--reset-offsets }}
> {{--group webserver-avro }}
> {{--topic driver-positions-avro }}
> {{ {{--to-datetime "}}{{2020-11-05T00:46:48.002237400}}" }}
> {{ {{--dry-run
> Relevant code 
> [here|https://github.com/apache/kafka/blob/2.7/clients/src/main/java/org/apache/kafka/common/utils/Utils.java#L1304].
>  The datetime is being turned into Nov 5, 2020 1:24:05.400 because 
> SimpleDateFormat is adding 2237400 milliseconds to Nov 5, 2020 00:46:48.
> Experimenting with getDateTime:
>  * getDateTime("2020-11-05T00:46:48.000") -> 1604537208000
>  * getDateTime("2020-11-05T00:46:48.000+0800") -> 1604508408000 - correct the 
> formatting string allows for ZZZ timezones
>  * getDateTime("2020-11-05T00:46:48.000123") -> 1604537208123 - note this 
> ends with 123 milliseconds.
> The pattern string is "-MM-dd'T'HH:mm:ss.SSS".  So SimpleDateFormat 
> interprets "000123" as 123 milliseconds. See the stackoverflow answer 
> [here|https://stackoverflow.com/a/21235602/109102].
> The fix?  Remove any digits after more than 3 characters after the decimal 
> point, or raise an exception. The code would still need to allow the RFC822 
> timezone, i.e Sign TwoDigitHours Minutes.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeqo commented on a change in pull request #9078: KAFKA-10132: Return correct value types for MBean attributes

2020-11-09 Thread GitBox


jeqo commented on a change in pull request #9078:
URL: https://github.com/apache/kafka/pull/9078#discussion_r519741816



##
File path: 
clients/src/main/java/org/apache/kafka/common/metrics/JmxReporter.java
##
@@ -272,8 +272,16 @@ public MBeanInfo getMBeanInfo() {
 for (Map.Entry entry : 
this.metrics.entrySet()) {
 String attribute = entry.getKey();
 KafkaMetric metric = entry.getValue();
+String metricType = double.class.getName();
+
+try {
+metricType = metric.metricValue().getClass().getName();
+} catch (NullPointerException e) {

Review comment:
   @rgroothuijsen this could be a bug on the DistributedHerder side. Seems 
that `DistributedHerder#herderMetrics` is initialized [too 
early](https://github.com/apache/kafka/blob/8e211eb72f9a45897cc37fed394a38096aa47feb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L216).
 Could you give a try moving it later in the constructor, [maybe after 
config](https://github.com/apache/kafka/blob/8e211eb72f9a45897cc37fed394a38096aa47feb/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java#L249),
 to check if the same exception happens again?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #9382: KAFKA-10554; Perform follower truncation based on diverging epochs in Fetch response

2020-11-09 Thread GitBox


rajinisivaram commented on pull request #9382:
URL: https://github.com/apache/kafka/pull/9382#issuecomment-723946383


   Ran system tests on the latest version, there were 7 failures which look 
like flaky tests that also fail on trunk (one TransactionTest and 6 variants of 
ConnectDistributedTest).



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #9577: KAFKA-9837: KIP-589 new RPC for notifying controller log dir failure

2020-11-09 Thread GitBox


dengziming commented on pull request #9577:
URL: https://github.com/apache/kafka/pull/9577#issuecomment-723936352


   @mumrah @hachikuji @bbejeck  Hi, PTAL.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7908) retention.ms and message.timestamp.difference.max.ms are tied

2020-11-09 Thread nandini (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7908?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228514#comment-17228514
 ] 

nandini commented on KAFKA-7908:


This applies to older versions too. Just found this in 0.11.0. The *Affects 
Version/s:* needs to be updated. 

> retention.ms and message.timestamp.difference.max.ms are tied
> -
>
> Key: KAFKA-7908
> URL: https://issues.apache.org/jira/browse/KAFKA-7908
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.0
>Reporter: Ciprian Pascu
>Priority: Minor
> Fix For: 2.3.0, 2.4.0
>
>
> When configuring retention.ms for a topic, following warning will be printed:
> _retention.ms for topic X is set to 180. It is smaller than 
> message.timestamp.difference.max.ms's value 9223372036854775807. This may 
> result in frequent log rolling. (kafka.log.Log)_
>  
> message.timestamp.difference.max.ms has not been configured explicitly, so it 
> has the default value of 9223372036854775807; I haven't seen anywhere 
> mentioned that this parameter needs to be configured also, if retention.ms is 
> configured; also, if we look at the default values for these parameters, they 
> are also so, that retention.ms < message.timestamp.difference.max.ms; so, 
> what is the purpose of this warning, in this case?
> The warning is generated from this code 
> (core/src/main/scala/kafka/log/Log.scala):
>   _def updateConfig(updatedKeys: Set[String], newConfig: LogConfig): Unit = {_
>     _*if ((updatedKeys.contains(LogConfig.RetentionMsProp)*_
>   *_|| 
> updatedKeys.contains(LogConfig.MessageTimestampDifferenceMaxMsProp))_*
>   _&& topicPartition.partition == 0  // generate warnings only for one 
> partition of each topic_
>   _&& newConfig.retentionMs < newConfig.messageTimestampDifferenceMaxMs)_
>   _warn(s"${LogConfig.RetentionMsProp} for topic ${topicPartition.topic} 
> is set to ${newConfig.retentionMs}. It is smaller than " +_
>     _s"${LogConfig.MessageTimestampDifferenceMaxMsProp}'s value 
> ${newConfig.messageTimestampDifferenceMaxMs}. " +_
>     _s"This may result in frequent log rolling.")_
>     _this.config = newConfig_
>   _}_
>  
> Shouldn't the || operand in the bolded condition be replaced with &&?
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming opened a new pull request #9577: KAFKA-9837: KIP-589 new RPC for notifying controller log dir failure

2020-11-09 Thread GitBox


dengziming opened a new pull request #9577:
URL: https://github.com/apache/kafka/pull/9577


   This patch implements 
[KIP-589](https://cwiki.apache.org/confluence/display/KAFKA/KIP-589+Add+API+to+update+Replica+state+in+Controller),
 which introduces an asynchronous API for brokers to notifying the controller 
of log dir failure.
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   1. Unit test for LogDirEventManagerImpl
   2. Integration test for new behavior
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10700) Support mutual TLS authentication for SASL_SSL listeners

2020-11-09 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-10700:
--

 Summary: Support mutual TLS authentication for SASL_SSL listeners
 Key: KAFKA-10700
 URL: https://issues.apache.org/jira/browse/KAFKA-10700
 Project: Kafka
  Issue Type: New Feature
  Components: security
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 2.8.0


See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-684+-+Support+mutual+TLS+authentication+on+SASL_SSL+listeners
 for details



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon opened a new pull request #9576: KAFKA-10685: strictly parsing the date/time format

2020-11-09 Thread GitBox


showuon opened a new pull request #9576:
URL: https://github.com/apache/kafka/pull/9576


   Strictly parsing the date/time format by setLenient(false). So it won't 
allow un-matched date/time format input to avoid the wrong parsing for 
microseconds/nanoseconds.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ghmulti edited a comment on pull request #9489: MINOR: demote "Committing task offsets" log to DEBUG

2020-11-09 Thread GitBox


ghmulti edited a comment on pull request #9489:
URL: https://github.com/apache/kafka/pull/9489#issuecomment-723911887


   There is a lot of useful information on an INFO level, but as you mentioned 
- having that line from several threads with 100ms frequency makes it way too 
noisy (especially when container is running on a cluster with log forwarder, 
which leads to waste of resources). Even though it is not complicated to change 
the log level configuration for that particular class, it is kind of 
inconvenient and prevents from using out-of-the-box default configs.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ghmulti commented on pull request #9489: MINOR: demote "Committing task offsets" log to DEBUG

2020-11-09 Thread GitBox


ghmulti commented on pull request #9489:
URL: https://github.com/apache/kafka/pull/9489#issuecomment-723911887


   There is a lot of useful information on an INFO level, but as you mentioned 
- having that line from several threads with 100ms frequency makes it way too 
noisy (especially when container is running on a cluster with log forwarder, 
which leads to waste of resources). Even though it is not complicated to change 
the log level configuration for that particular class, it is kind of 
inconvenient and prevents from using out-of-the box default configs.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10688) Handle accidental truncation of repartition topics as exceptional failure

2020-11-09 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10688?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17228465#comment-17228465
 ] 

Bruno Cadonna commented on KAFKA-10688:
---

Maybe I misunderstood your previous comment.

In your proposal in 1) and 2) aren't you  proposing to reset repartition topics 
by using the global policy?

When would a repartition topic not have a valid committed offset after an 
offset was committed for the first time (i.e. first commit after a fresh start 
of the Streams application)?

Is not the fact that an repartitition topic does not have a valid committed 
offset enough to throw a fatal error? Why should we reset the repartition 
topics in point  1) and 2) in your proposal? 

> Handle accidental truncation of repartition topics as exceptional failure
> -
>
> Key: KAFKA-10688
> URL: https://issues.apache.org/jira/browse/KAFKA-10688
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Guozhang Wang
>Priority: Major
>
> Today we always handle InvalidOffsetException from the main consumer by the 
> resetting policy assuming they are for source topics. But repartition topics 
> are also source topics and should never be truncated and hence cause 
> InvalidOffsetException.
> We should differentiate these repartition topics from external source topics 
> and treat the InvalidOffsetException from repartition topics as fatal and 
> close the whole application.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-09 Thread GitBox


dajac commented on pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#issuecomment-723887856


   @splett2 Code does not compile:
   ```
   [Error] 
/home/jenkins/jenkins-agent/workspace/Kafka_kafka-pr_PR-9386/core/src/main/scala/kafka/network/SocketServer.scala:1456:
 value getOrDefault is not a member of 
scala.collection.mutable.Map[java.net.InetAddress,Int]
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-11-09 Thread GitBox


chia7712 commented on pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#issuecomment-723875285


   > Can we summarize the regression here for a real world workload?
   
   @ijuma I have attached benchmark result to description. I will loop more 
benchmark later.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9573: KAFKA-10693: Close quota managers created in tests

2020-11-09 Thread GitBox


dajac commented on a change in pull request #9573:
URL: https://github.com/apache/kafka/pull/9573#discussion_r519620665



##
File path: core/src/test/scala/unit/kafka/server/IsrExpirationTest.scala
##
@@ -62,14 +64,16 @@ class IsrExpirationTest {
 EasyMock.replay(logManager)
 
 alterIsrManager = TestUtils.createAlterIsrManager()
+quotaManager = QuotaFactory.instantiate(configs.head, metrics, time, "")
 replicaManager = new ReplicaManager(configs.head, metrics, time, null, 
null, logManager, new AtomicBoolean(false),
-  QuotaFactory.instantiate(configs.head, metrics, time, ""), new 
BrokerTopicStats, new MetadataCache(configs.head.brokerId),
+  quotaManager, new BrokerTopicStats, new 
MetadataCache(configs.head.brokerId),
   new LogDirFailureChannel(configs.head.logDirs.size), alterIsrManager)
   }
 
   @After
   def tearDown(): Unit = {
 replicaManager.shutdown(false)
+quotaManager.shutdown()

Review comment:
   nit: We should check that `quotaManager` is not null. We can also do it 
for `replicaManager` above.

##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -1506,7 +1508,7 @@ class ReplicaManagerTest {
   purgatoryName = "ElectLeader", timer, reaperEnabled = false)
 
 // Mock network client to show leader offset of 5
-val quota = QuotaFactory.instantiate(config, metrics, time, "")
+val quota = quotaManager

Review comment:
   nit: Could we get rid of `quota`?

##
File path: core/src/test/scala/unit/kafka/server/ReplicaManagerTest.scala
##
@@ -79,22 +81,24 @@ class ReplicaManagerTest {
 EasyMock.expect(kafkaZkClient.getEntityConfigs(EasyMock.anyString(), 
EasyMock.anyString())).andReturn(new Properties()).anyTimes()
 EasyMock.replay(kafkaZkClient)
 
+val props = TestUtils.createBrokerConfig(1, TestUtils.MockZkConnect)
+config = KafkaConfig.fromProps(props)
 alterIsrManager = EasyMock.createMock(classOf[AlterIsrManager])
+quotaManager = QuotaFactory.instantiate(config, metrics, time, "")
   }
 
   @After
   def tearDown(): Unit = {
 TestUtils.clearYammerMetrics()
+quotaManager.shutdown()

Review comment:
   Should we check that `quotaManager` is not null?

##
File path: 
core/src/test/scala/unit/kafka/server/epoch/OffsetsForLeaderEpochTest.scala
##
@@ -112,4 +120,11 @@ class OffsetsForLeaderEpochTest {
 //Then
 assertEquals(new EpochEndOffset(Errors.UNKNOWN_TOPIC_OR_PARTITION, 
UNDEFINED_EPOCH, UNDEFINED_EPOCH_OFFSET), response(tp))
   }
+
+  @After
+  def tearDown(): Unit = {
+replicaManager.shutdown(checkpointHW = false)
+quotaManager.shutdown()

Review comment:
   We should check non null in both cases.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org