[GitHub] [kafka] bseenu commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0

2020-05-05 Thread GitBox


bseenu commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r419913034



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##
@@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) {
 Checkpoint.unwrapGroup(record.sourcePartition()),
 System.currentTimeMillis() - record.timestamp());
 }
+
+private void refreshIdleConsumerGroupOffset() {
+Map> consumerGroupsDesc 
= targetAdminClient
+.describeConsumerGroups(consumerGroups).describedGroups();
+
+for (String group : consumerGroups) {
+try {
+ConsumerGroupDescription consumerGroupDesc = 
consumerGroupsDesc.get(group).get();
+ConsumerGroupState consumerGroupState = 
consumerGroupDesc.state();
+// sync offset to the target cluster only if the state of 
current consumer group is:
+// (1) idle: because the consumer at target is not actively 
consuming the mirrored topic
+// (2) dead: the new consumer that is recently created at 
source and never exist at target
+if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) {
+idleConsumerGroupsOffset.put(group, 
targetAdminClient.listConsumerGroupOffsets(group)
+.partitionsToOffsetAndMetadata().get().entrySet());
+} else if (consumerGroupState.equals(ConsumerGroupState.DEAD)) 
{
+newConsumerGroup.add(group);
+}
+} catch (InterruptedException | ExecutionException e) {
+log.error("Error querying for consumer group {} on cluster 
{}.", group, targetClusterAlias, e);
+}
+}
+}
+
+Map> syncGroupOffset() {
+Map> offsetToSyncAll = 
new HashMap<>();
+
+// first, sync offsets for the idle consumers at target
+for (Map.Entry>> group : idleConsumerGroupsOffset.entrySet()) {
+String consumerGroupId = group.getKey();
+// for each idle consumer at target, read the checkpoints 
(converted upstream offset)
+// from the pre-populated map
+Map convertedUpstreamOffset = 
getConvertedUpstreamOffset(consumerGroupId);
+
+if (convertedUpstreamOffset == null) continue;
+
+Map offsetToSync = new 
HashMap<>();
+for (Entry entry : 
group.getValue()) {
+long latestDownstreamOffset = entry.getValue().offset();
+TopicPartition topicPartition = entry.getKey();
+if (!convertedUpstreamOffset.containsKey(topicPartition)) {
+log.trace("convertedUpstreamOffset does not contain 
TopicPartition: {}", topicPartition.toString());
+continue;
+}
+
+// if translated offset from upstream is smaller than the 
current consumer offset
+// in the target, skip updating the offset for that partition
+long convertedOffset = 
convertedUpstreamOffset.get(topicPartition).offset();
+if (latestDownstreamOffset >= convertedOffset) {
+log.trace("latestDownstreamOffset {} is larger than 
convertedUpstreamOffset {} for "
++ "TopicPartition {}", latestDownstreamOffset, 
convertedOffset, topicPartition);
+continue;
+}

Review comment:
   I would like to propose the following changes to sync the consumer group 
changes on source side 
   
   ```suggestion
for (Map.Entry convertedEntry : 
convertedUpstreamOffset.entrySet()) {
   TopicPartition topicPartition = convertedEntry.getKey();
   for (Entry idleEntry : 
group.getValue()) {
   if (idleEntry.getKey() == topicPartition) {
   long latestDownstreamOffset = 
idleEntry.getValue().offset();
   // if translated offset from upstream is smaller 
than the current consumer offset
   // in the target, skip updating the offset for that 
partition
   long convertedOffset = 
convertedUpstreamOffset.get(topicPartition).offset();
   if (latestDownstreamOffset >= convertedOffset) {
   log.trace("latestDownstreamOffset {} is larger 
than convertedUpstreamOffset {} for "
   + "TopicPartition {}", 
latestDownstreamOffset, convertedOffset, topicPartition);
   continue;
   }
   }
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For querie

[GitHub] [kafka] dajac commented on a change in pull request #8609: KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller

2020-05-05 Thread GitBox


dajac commented on a change in pull request #8609:
URL: https://github.com/apache/kafka/pull/8609#discussion_r419946491



##
File path: core/src/main/scala/kafka/controller/ControllerChannelManager.scala
##
@@ -590,7 +569,26 @@ abstract class 
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
 
   stateChangeLog.info(s"Sending StopReplica request for 
${partitionStates.size} " +
 s"replicas to broker $brokerId")
-  sendStopReplicaRequest(brokerId, brokerEpoch, false, 
stopReplicaTopicState)
+  val stopReplicaRequestBuilder = new StopReplicaRequest.Builder(
+stopReplicaRequestVersion, controllerId, controllerEpoch, 
brokerEpoch,
+false, stopReplicaTopicState.values.toBuffer.asJava)
+
+  sendRequest(brokerId, stopReplicaRequestBuilder, (r: 
AbstractResponse) => {
+val stopReplicaResponse = r.asInstanceOf[StopReplicaResponse]

Review comment:
   I agree as well. Passing a function is a really good idea. I should have 
thought about it.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #8609: KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller

2020-05-05 Thread GitBox


dajac commented on pull request #8609:
URL: https://github.com/apache/kafka/pull/8609#issuecomment-623928516


   @lbradstreet @hachikuji Thanks for your feedback. I have updated the PR 
accordingly.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #8598: KAFKA-9945: TopicCommand should support --if-exists and --if-not-exists when --bootstrap-server is used

2020-05-05 Thread GitBox


dajac commented on a change in pull request #8598:
URL: https://github.com/apache/kafka/pull/8598#discussion_r419970261



##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -245,7 +245,17 @@ object TopicCommand extends Logging {
 
 newTopic.configs(configsMap)
 val createResult = 
adminClient.createTopics(Collections.singleton(newTopic))
-createResult.all().get()
+try {
+  createResult.all().get()
+} catch {
+  case e: ExecutionException => {
+val cause = e.getCause
+if (cause.isInstanceOf[TopicExistsException] || 
topic.ifTopicDoesntExist()) {

Review comment:
   Shouldn't we re-throw all exceptions except `TopicExistsException` if 
`topic.ifTopicDoesntExist`?

##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -245,7 +245,17 @@ object TopicCommand extends Logging {
 
 newTopic.configs(configsMap)
 val createResult = 
adminClient.createTopics(Collections.singleton(newTopic))
-createResult.all().get()
+try {
+  createResult.all().get()
+} catch {
+  case e: ExecutionException => {
+val cause = e.getCause
+if (cause.isInstanceOf[TopicExistsException] || 
topic.ifTopicDoesntExist()) {
+  throw e
+}
+  }
+}
+
 println(s"Created topic ${topic.name}.")
   } else {
 throw new IllegalArgumentException(s"Topic ${topic.name} already 
exists")

Review comment:
   Apparently, we already verify the existence of the topic prior to 
creating it. Should we also handle `--if-not-exists` in this case?

##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -257,21 +267,50 @@ object TopicCommand extends Logging {
 }
 
 override def alterTopic(opts: TopicCommandOptions): Unit = {
-  val topic = new CommandTopicPartition(opts)
-  val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-  ensureTopicExists(topics, opts.topic)
-  val topicsInfo = 
adminClient.describeTopics(topics.asJavaCollection).values()
-  adminClient.createPartitions(topics.map {topicName =>
-if (topic.hasReplicaAssignment) {
-  val startPartitionId = 
topicsInfo.get(topicName).get().partitions().size()
-  val newAssignment = {
-val replicaMap = topic.replicaAssignment.get.drop(startPartitionId)
-new util.ArrayList(replicaMap.map(p => 
p._2.asJava).asJavaCollection).asInstanceOf[util.List[util.List[Integer]]]
+  if(opts.topicConfig.isDefined || opts.configsToDelete.isDefined) {
+throw new RuntimeException("Using --config or --delete-config is not 
supported " +
+  "when altering a topic via the broker API.  Use kafka-configs.sh 
instead.")

Review comment:
   nit: Extra space after the first `.`.

##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -245,7 +245,17 @@ object TopicCommand extends Logging {
 
 newTopic.configs(configsMap)
 val createResult = 
adminClient.createTopics(Collections.singleton(newTopic))
-createResult.all().get()
+try {
+  createResult.all().get()

Review comment:
   nit: the parenthesis are not necessary. there is other cases when they 
can be removed.

##
File path: core/src/main/scala/kafka/admin/TopicCommand.scala
##
@@ -257,21 +267,50 @@ object TopicCommand extends Logging {
 }
 
 override def alterTopic(opts: TopicCommandOptions): Unit = {
-  val topic = new CommandTopicPartition(opts)
-  val topics = getTopics(opts.topic, opts.excludeInternalTopics)
-  ensureTopicExists(topics, opts.topic)
-  val topicsInfo = 
adminClient.describeTopics(topics.asJavaCollection).values()
-  adminClient.createPartitions(topics.map {topicName =>
-if (topic.hasReplicaAssignment) {
-  val startPartitionId = 
topicsInfo.get(topicName).get().partitions().size()
-  val newAssignment = {
-val replicaMap = topic.replicaAssignment.get.drop(startPartitionId)
-new util.ArrayList(replicaMap.map(p => 
p._2.asJava).asJavaCollection).asInstanceOf[util.List[util.List[Integer]]]
+  if(opts.topicConfig.isDefined || opts.configsToDelete.isDefined) {
+throw new RuntimeException("Using --config or --delete-config is not 
supported " +
+  "when altering a topic via the broker API.  Use kafka-configs.sh 
instead.")
+  }
+  val tp = new CommandTopicPartition(opts)
+  if (tp.hasPartitions) {
+println("WARNING: If partitions are increased for a topic that has a 
key, the partition " +
+  "logic or ordering of the messages will be affected")
+val topicDescription = try {
+  adminClient.describeTopics(Collections.singleton(tp.name)).
+all().get().get(tp.name)

Review comment:
   nit: unneces

[GitHub] [kafka] dajac commented on pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs

2020-05-05 Thread GitBox


dajac commented on pull request #8311:
URL: https://github.com/apache/kafka/pull/8311#issuecomment-623957188


   @tombentley thanks for the addition. we could perhaps add a small unit test 
for the sake of completeness. we already cover all the other cases but this one.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas

2020-05-05 Thread Mateusz Jadczyk (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099743#comment-17099743
 ] 

Mateusz Jadczyk commented on KAFKA-9891:


[~bchen225242] Hi, is any follow-up planned for this?

> Invalid state store content after task migration with exactly_once and 
> standby replicas
> ---
>
> Key: KAFKA-9891
> URL: https://issues.apache.org/jira/browse/KAFKA-9891
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Mateusz Jadczyk
>Assignee: Boyang Chen
>Priority: Blocker
>
> We have a simple command id deduplication mechanism (very similar to the one 
> from Kafka Streams examples) based on Kafka Streams State Stores. It stores 
> command ids from the past hour in _persistentWindowStore_. We encountered a 
> problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this 
> particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id 
> (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active 
> task 1_2. First node in the topology analyses if this is a duplicate by 
> checking in the state store (_COMMAND_ID_STORE_), if not puts the command id 
> in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id 
> (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the 
> duplicated command id is performed, it's not a duplicate, command id is put 
> into the state store. Next node in the topology throws an exception which 
> causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, 
> offsets are not committed. I double checked for the changelog topic - 
> relevant messages are not committed. Therefore, the changelog topic contains 
> only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and 
> not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated 
> _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local 
> _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. 
> It checks if this command id is a duplicate - no, it isn't - tries to process 
> the faulty command and throws an exception. Again, transaction aborted, all 
> looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, 
> *it is a duplicate!* Even though the transaction has been aborted and the 
> changelog doesn't contain this command id: 
> _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack 
> Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
>  we concluded it has something to do with checkpoint files. Here are the 
> detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> Processor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
> standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/Processor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp 
> /tmp

[GitHub] [kafka] nizhikov commented on pull request #8592: KAFKA-3184: Add Checkpoint for In-memory State Store

2020-05-05 Thread GitBox


nizhikov commented on pull request #8592:
URL: https://github.com/apache/kafka/pull/8592#issuecomment-623993236


   retest this please
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9957) Kafka Controller doesn't failover during hardware failure

2020-05-05 Thread Eric Ward (Jira)
Eric Ward created KAFKA-9957:


 Summary: Kafka Controller doesn't failover during hardware failure
 Key: KAFKA-9957
 URL: https://issues.apache.org/jira/browse/KAFKA-9957
 Project: Kafka
  Issue Type: Bug
  Components: controller
Affects Versions: 2.5.0, 2.2.0
Reporter: Eric Ward
 Attachments: kafka-threaddump.out

On a couple different production environments we've run into an issue where a 
hardware failure has hung up the controller and prevented controller and topic 
leadership from changing to a healthy broker.  When the issue happens we see 
this repeated in the logs at regular intervals for the other brokers (the 
affected broker can’t write to disk, so no logging occurs there):
{noformat}
[2020-04-26 01:12:30,613] WARN [ReplicaFetcher replicaId=0, leaderId=2, 
fetcherId=0] Error in response for fetch request (type=FetchRequest, 
replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={*snip*}, 
isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1962806970, 
epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)
java.io.IOException: Connection to 2 was disconnected before the response was 
read
at 
org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
at 
kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:100)
at 
kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:193)
at 
kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:280)
at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
at 
kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
at scala.Option.foreach(Option.scala:274)
at 
kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
at 
kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)

{noformat}
This issue appears to be similar to KAFKA-7870, though that issue was 
purportedly fixed by KAFKA-7697.

Once we encounter this error any partitions whose leadership is on the affected 
node are unavailable until we force that broker out of the cluster – that is to 
say, kill the node.

When we initially hit the issue we were running on version 2.2.0, though I've 
been able to reproduce this in an environment running 2.5.0 as well. To 
simulate the hardware failure I'm using the xfs_freeze utility to suspend 
access to the filesystem.  Zookeeper failover is also part of the mix.  In all 
instances where we’ve seen this the ZK leader and Kafka Controller were on the 
same node and both affected by the hardware issue.  Zookeeper is able to 
successfully failover, which it does rather quickly.

Reproduction steps are pretty straightforward:
 # Spin up a 3 node cluster
 # Ensure that the Kafka Controller and Zookeeper Leader are on the same node.
 # xfs_freeze the filesystem on the node that the controller is running on

This reproduces 100% of the time for me.  I’ve left it running for well over an 
hour without any Kafka failover happening.  Unfreezing the node will allow the 
cluster to heal itself.

I’ve attached a thread dump from an environment running 2.5.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] viktorsomogyi commented on a change in pull request #8591: KAFKA-6342: Move workaround for JSON parsing of non-escaped strings

2020-05-05 Thread GitBox


viktorsomogyi commented on a change in pull request #8591:
URL: https://github.com/apache/kafka/pull/8591#discussion_r420040567



##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -35,16 +35,7 @@ object Json {
*/
   def parseFull(input: String): Option[JsonValue] =

Review comment:
   I looks like `parseFull` is used in a couple of places but I can perhaps 
just delegate this to `parseBytes` like this:
   ```
   def parseFull(input: String): Option[JsonValue] = 
parseBytes(input.getBytes(Charset.defaultCharset()))
   ```
   Or shall I just get rid of this code and use `parseBytes` everywhere?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9909) Kafka Streams : offset control to Streams API

2020-05-05 Thread Gopikrishna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099806#comment-17099806
 ] 

Gopikrishna commented on KAFKA-9909:


i can understand on rebalance, but you mentioned "streams should NOT commit 
anything until it was closed". Not clear on what was "it" closed, did you mean 
that application is closed? if you mean that way, if the application closes due 
to an error, will it commit the offset?

I handled DeserializationExceptionHandler if any ill-formatted message is 
received, i meant skipping offsets to avoid context.commit() on messages that 
cannot be processed during that time. 

With traditional kafka consumer, i can acknowledge the offset, but i dont have 
an option not to acknowledge except avoiding context.commit(). 

*here is the code snippet to explain the scenario, i am talking about:* 

public class CustomProcessor implements Processor {
 private ProcessorContext context;

@Override
public void init(ProcessorContext context) {
 this.context = context;

}

@Override
public void process(String key, String value) {
/* i am throwing a runtime exception to come out of process method without 
explicitly committing the offset. 

*/
 try {
 if (value.contains("hello")) {
 System.out.println("Skipping offset : "+context.offset());
 throw new RuntimeException("Hello raising exception!");
 }
 System.out.println("offset : "+context.offset()+ " partition : 
"+context.partition());
context.commit();
 }catch (Exception e)
 {
 System.out.println("Log & Continue exception: "+e.getMessage());
 }
}

}

once the process method completes with exception (without context.commit()), 
the offset is still committed. 

> Kafka Streams : offset control to Streams API
> -
>
> Key: KAFKA-9909
> URL: https://issues.apache.org/jira/browse/KAFKA-9909
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
> Environment: All
>Reporter: Gopikrishna
>Priority: Minor
>  Labels: Offset, commit
>
> Hello team, really inspired the way streams api is running today. I would 
> like to have a feature to be flexible regarding the offset. when we write the 
> processor api, processor context object can be used to commit the offset. 
> this is not effective. but streams are controlling the offset. the moment the 
> process method executed or scheduled window completed, the offset is 
> committed automatically by streams internally. 
> Like traditional kafka consumer, its better the context object should have 
> complete control over the offset whether to commit or not. This will give 
> more control to the api to handle failovers and especially when message 
> cannot be processed, context should not commit the offset. Appreciate this 
> can be implemented. 
>  
> h4. enable.auto.commit is by default false, but streams are committing 
> automatically the offset. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] viktorsomogyi commented on a change in pull request #8591: KAFKA-6342: Move workaround for JSON parsing of non-escaped strings

2020-05-05 Thread GitBox


viktorsomogyi commented on a change in pull request #8591:
URL: https://github.com/apache/kafka/pull/8591#discussion_r420040567



##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -35,16 +35,7 @@ object Json {
*/
   def parseFull(input: String): Option[JsonValue] =

Review comment:
   I looks like `parseFull` is used in a couple of places as Json mentioned 
but I can perhaps just delegate this to `parseBytes` like this below and can 
continue using that in the mentioned callers.
   ```
   def parseFull(input: String): Option[JsonValue] = 
parseBytes(input.getBytes(Charset.defaultCharset()))
   ```
   Or shall I just get rid of this code and use `parseBytes` everywhere?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

2020-05-05 Thread GitBox


cadonna commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853



##
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, 
current_generation):
 log_monitor.wait_until("Sent a version 8 subscription 
and got version 7 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version 7 and trigger new 
rebalance.",
timeout_sec=60,
err_msg="Could not detect 
'successful version probing' at upgrading node " + str(node.account))
-else:
-log_monitor.wait_until("Sent a version 8 subscription 
and got version 7 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version 8 and trigger new 
rebalance.",
+log_monitor.wait_until("Detected that the assignor 
requested a rebalance. Rejoining the consumer group to trigger a new 
rebalance.",

Review comment:
   I guess what we really would need is a way to check if a group 
stabilized. We try to do that by verifying that the generations of the 
processors are synced. However, I ran into cases where all processor had the 
same generation, but one processor did not have any tasks assigned, because in 
that specific rebalance the corresponding partitions were revoked from the 
other processors. So we would actually need to check if they have the highest 
generation is in sync across the processors AND if all processors have at least 
one task assigned (AND if all tasks were assigned).   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

2020-05-05 Thread GitBox


cadonna commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853



##
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, 
current_generation):
 log_monitor.wait_until("Sent a version 8 subscription 
and got version 7 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version 7 and trigger new 
rebalance.",
timeout_sec=60,
err_msg="Could not detect 
'successful version probing' at upgrading node " + str(node.account))
-else:
-log_monitor.wait_until("Sent a version 8 subscription 
and got version 7 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version 8 and trigger new 
rebalance.",
+log_monitor.wait_until("Detected that the assignor 
requested a rebalance. Rejoining the consumer group to trigger a new 
rebalance.",

Review comment:
   I guess what we really would need is a way to check if a group 
stabilized. We try to do that by verifying that the generations of the 
processors are synced. However, I ran into cases where all processor had the 
same generation, but one processor did not have any tasks assigned, because in 
that specific rebalance the corresponding partitions were revoked from the 
other processors. So we would actually need to check if they have the highest 
generation in sync across the processors AND if all processors have at least 
one task assigned (AND if all tasks were assigned).   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

2020-05-05 Thread GitBox


cadonna commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853



##
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, 
current_generation):
 log_monitor.wait_until("Sent a version 8 subscription 
and got version 7 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version 7 and trigger new 
rebalance.",
timeout_sec=60,
err_msg="Could not detect 
'successful version probing' at upgrading node " + str(node.account))
-else:
-log_monitor.wait_until("Sent a version 8 subscription 
and got version 7 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version 8 and trigger new 
rebalance.",
+log_monitor.wait_until("Detected that the assignor 
requested a rebalance. Rejoining the consumer group to trigger a new 
rebalance.",

Review comment:
   We can leave it because it verifies whether the assignment was triggered 
in the assignor. However, it does not give us any guarantee that the rebalance 
took actually place. 
   
   I guess what we really would need is a way to check if a group stabilized. 
We try to do that by verifying that the generations of the processors are 
synced. However, I ran into cases where all processor had the same generation, 
but one processor did not have any tasks assigned, because in that specific 
rebalance the corresponding partitions were revoked from the other processors. 
So we would actually need to check if they have the highest generation in sync 
across the processors AND if all processors have at least one task assigned 
(AND if all tasks were assigned).   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

2020-05-05 Thread GitBox


cadonna commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853



##
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, 
current_generation):
 log_monitor.wait_until("Sent a version 8 subscription 
and got version 7 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version 7 and trigger new 
rebalance.",
timeout_sec=60,
err_msg="Could not detect 
'successful version probing' at upgrading node " + str(node.account))
-else:
-log_monitor.wait_until("Sent a version 8 subscription 
and got version 7 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version 8 and trigger new 
rebalance.",
+log_monitor.wait_until("Detected that the assignor 
requested a rebalance. Rejoining the consumer group to trigger a new 
rebalance.",

Review comment:
   We can leave it because it verifies whether the assignment was triggered 
in the assignor. However, it does not give us any guarantee that the rebalance 
took actually place. 
   
   I guess what we really would need is a way to check if a group stabilized 
and if the assignment is valid. We try to do that by verifying that the 
generations of the processors are synced. However, I ran into cases where all 
processor had the same generation, but one processor did not have any tasks 
assigned, because in that specific rebalance the corresponding partitions were 
revoked from the other processors. So we would actually need to check if they 
have the highest generation in sync across the processors AND if all processors 
have at least one task assigned (AND if all tasks were assigned).   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

2020-05-05 Thread GitBox


cadonna commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853



##
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, 
current_generation):
 log_monitor.wait_until("Sent a version 8 subscription 
and got version 7 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version 7 and trigger new 
rebalance.",
timeout_sec=60,
err_msg="Could not detect 
'successful version probing' at upgrading node " + str(node.account))
-else:
-log_monitor.wait_until("Sent a version 8 subscription 
and got version 7 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version 8 and trigger new 
rebalance.",
+log_monitor.wait_until("Detected that the assignor 
requested a rebalance. Rejoining the consumer group to trigger a new 
rebalance.",

Review comment:
   We can leave it because it verifies whether the assignment was triggered 
in the assignor. However, it does not give us any guarantee that the rebalance 
took actually place. 
   
   I guess what we really would need is a way to check if a group stabilized 
and if the assignment is valid. We try to do that by verifying that the 
generations of the processors are synced. However, I ran into cases where all 
processor had the same generation, but one processor did not have any tasks 
assigned. So we would actually need to check if they have the highest 
generation in sync across the processors AND if all processors have at least 
one task assigned (AND if all tasks were assigned).   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-5948) EosIntegrationTest fails with TopicAlreadyMarkedForDeletionException

2020-05-05 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5948?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-5948:


Assignee: Bruno Cadonna

> EosIntegrationTest fails with TopicAlreadyMarkedForDeletionException
> 
>
> Key: KAFKA-5948
> URL: https://issues.apache.org/jira/browse/KAFKA-5948
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Affects Versions: 0.11.0.1
>Reporter: Matthias J. Sax
>Assignee: Bruno Cadonna
>Priority: Major
>
> Seem to be a test setup race condition:
> {noformat}
> kafka.common.TopicAlreadyMarkedForDeletionException: topic 
> singlePartitionThroughTopic is already marked for deletion
>   at kafka.admin.AdminUtils$.deleteTopic(AdminUtils.scala:340)
>   at kafka.admin.AdminUtils.deleteTopic(AdminUtils.scala)
>   at 
> org.apache.kafka.streams.integration.utils.KafkaEmbedded.deleteTopic(KafkaEmbedded.java:200)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:268)
>   at 
> org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.deleteTopicsAndWait(EmbeddedKafkaCluster.java:256)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.createTopics(EosIntegrationTest.java:102)
> {noformat}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9924) Add RocksDB Memory Consumption to RocksDB Metrics

2020-05-05 Thread Bruno Cadonna (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9924?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bruno Cadonna reassigned KAFKA-9924:


Assignee: Bruno Cadonna

> Add RocksDB Memory Consumption to RocksDB Metrics 
> --
>
> Key: KAFKA-9924
> URL: https://issues.apache.org/jira/browse/KAFKA-9924
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Bruno Cadonna
>Assignee: Bruno Cadonna
>Priority: Major
>  Labels: needs-kip
>
> RocksDB's memory consumption should be added to the RocksDB metrics.
> RocksDB's memory consumption can be retrieved with the following class:
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/MemoryUtil.java
> The memory consumption metrics should be added on client level and should be 
> recorded on INFO level.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cadonna commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

2020-05-05 Thread GitBox


cadonna commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r420052853



##
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, 
current_generation):
 log_monitor.wait_until("Sent a version 8 subscription 
and got version 7 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version 7 and trigger new 
rebalance.",
timeout_sec=60,
err_msg="Could not detect 
'successful version probing' at upgrading node " + str(node.account))
-else:
-log_monitor.wait_until("Sent a version 8 subscription 
and got version 7 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version 8 and trigger new 
rebalance.",
+log_monitor.wait_until("Detected that the assignor 
requested a rebalance. Rejoining the consumer group to trigger a new 
rebalance.",

Review comment:
   We can leave it because it verifies whether the assignment was triggered 
in the assignor, which is better than nothing. However, it does not give us any 
guarantee that the rebalance took actually place. 
   
   I guess what we really would need is a way to check if a group stabilized 
and if the assignment is valid. We try to do that by verifying that the 
generations of the processors are synced. However, I ran into cases where all 
processors had the same generation, but one processor did not have any tasks 
assigned. So we would actually need to check if they have the highest 
generation in sync across the processors AND if all processors have at least 
one task assigned (AND if all tasks were assigned).   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] viktorsomogyi commented on pull request #7609: KAFKA-9059: Implement ReassignmentMaxLag

2020-05-05 Thread GitBox


viktorsomogyi commented on pull request #7609:
URL: https://github.com/apache/kafka/pull/7609#issuecomment-624066323


   @hachikuji I can refactor it, allow me a couple of days to do it. Did you 
mean to move the lag tracking too from `Replica` to the fetcher side, or should 
I leave these at their current place and just move the metric itself?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8591: KAFKA-6342: Move workaround for JSON parsing of non-escaped strings

2020-05-05 Thread GitBox


ijuma commented on a change in pull request #8591:
URL: https://github.com/apache/kafka/pull/8591#discussion_r420124856



##
File path: core/src/main/scala/kafka/utils/Json.scala
##
@@ -35,16 +35,7 @@ object Json {
*/
   def parseFull(input: String): Option[JsonValue] =

Review comment:
   This code is fine as it is. We are suggesting that we don't need 
`parseBytesWithAclFallback` in `AclEntry`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7450) "Handshake message sequence violation" related ssl handshake failure leads to high cpu usage

2020-05-05 Thread Karel Kotmel (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7450?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099898#comment-17099898
 ] 

Karel Kotmel commented on KAFKA-7450:
-

Hi Team,

I ran to the similar SSL issue with extendedKeyUsage attribute, our company 
policy does not support to have certificate, which has the extendedKeyUsage = 
serverAuth and extendedKeyUsage =clientAuth at the same time. Would it be 
possible to separate client and server certificate for interbroker 
communication? The client connection is not an issue. At this moment I got error

_Invalid value javax.net.ssl.SSLHandshakeException: Extended key usage does not 
permit use for TLS server authentication for configuration A client SSLEngine 
created with the provided settings can't connect to a server SSLEngine created 
with those settings_

 

Regards

Karel

> "Handshake message sequence violation" related ssl handshake failure leads to 
> high cpu usage
> 
>
> Key: KAFKA-7450
> URL: https://issues.apache.org/jira/browse/KAFKA-7450
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.0.0
>Reporter: Yu Yang
>Priority: Major
>
> After updating security.inter.broker.protocol to SSL for our cluster, we 
> observed that the controller can get into almost 100% cpu usage from time to 
> time. 
> {code:java}
> listeners=PLAINTEXT://:9092,SSL://:9093
> security.inter.broker.protocol=SSL
> {code}
> There is no obvious error in server.log. But in controller.log, there is 
> repetitive SSL handshare failure error as below:
> {code:java}
> [2018-09-28 05:53:10,821] WARN [RequestSendThread controllerId=6042] 
> Controller 6042's connection to broker datakafka06176.ec2.pin220.com:9093 
> (id: 6176 rack: null) was unsuccessful (kafka.controller.RequestSendThread)
> org.apache.kafka.common.errors.SslAuthenticationException: SSL handshake 
> failed
> Caused by: javax.net.ssl.SSLProtocolException: Handshake message sequence 
> violation, 2
> at sun.security.ssl.Handshaker.checkThrown(Handshaker.java:1487)
> at 
> sun.security.ssl.SSLEngineImpl.checkTaskThrown(SSLEngineImpl.java:535)
> at 
> sun.security.ssl.SSLEngineImpl.readNetRecord(SSLEngineImpl.java:813)
> at sun.security.ssl.SSLEngineImpl.unwrap(SSLEngineImpl.java:781)
> at javax.net.ssl.SSLEngine.unwrap(SSLEngine.java:624)
> at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:468)
> at 
> org.apache.kafka.common.network.SslTransportLayer.doHandshake(SslTransportLayer.java:331)
> at 
> org.apache.kafka.common.network.SslTransportLayer.handshake(SslTransportLayer.java:258)
> at 
> org.apache.kafka.common.network.KafkaChannel.prepare(KafkaChannel.java:125)
> at 
> org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:487)
> at org.apache.kafka.common.network.Selector.poll(Selector.java:425)
> at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:510)
> at 
> org.apache.kafka.clients.NetworkClientUtils.awaitReady(NetworkClientUtils.java:73)
> at 
> kafka.controller.RequestSendThread.brokerReady(ControllerChannelManager.scala:279)
> at 
> kafka.controller.RequestSendThread.doWork(ControllerChannelManager.scala:233)
> at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> Caused by: javax.net.ssl.SSLProtocolException: Handshake message sequence 
> violation, 2
> at 
> sun.security.ssl.ClientHandshaker.processMessage(ClientHandshaker.java:196)
> at sun.security.ssl.Handshaker.processLoop(Handshaker.java:1026)
> at sun.security.ssl.Handshaker$1.run(Handshaker.java:966)
> at sun.security.ssl.Handshaker$1.run(Handshaker.java:963)
> at java.security.AccessController.doPrivileged(Native Method)
> at sun.security.ssl.Handshaker$DelegatedTask.run(Handshaker.java:1416)
> at 
> org.apache.kafka.common.network.SslTransportLayer.runDelegatedTasks(SslTransportLayer.java:393)
> at 
> org.apache.kafka.common.network.SslTransportLayer.handshakeUnwrap(SslTransportLayer.java:473)
> ... 10 more
> {code}
> {code:java}
> [2018-09-30 00:30:13,609] WARN [ReplicaFetcher replicaId=59, leaderId=66, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=59, maxWait=500, minBytes=1, maxBytes=10485760, 
> fetchData={the_test_topic-18=(offset=462333447, logStartOffset=462286948, 
> maxBytes=4194304), the_test_topic-58=(offset=462312762, 
> logStartOffset=462295078, maxBytes=4194304)}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1991153671, 
> epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)
> org.apache.kafk

[jira] [Resolved] (KAFKA-9731) Increased fetch request rate with leader selector due to HW propagation

2020-05-05 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma resolved KAFKA-9731.

Resolution: Fixed

> Increased fetch request rate with leader selector due to HW propagation
> ---
>
> Key: KAFKA-9731
> URL: https://issues.apache.org/jira/browse/KAFKA-9731
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.0, 2.4.1
>Reporter: Vahid Hashemian
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.6.0
>
> Attachments: image-2020-03-17-10-19-08-987.png
>
>
> KIP-392 adds high watermark propagation to followers as a means to better 
> sync up followers HW with leader. The issue we have noticed after trying out 
> 2.4.0 and 2.4.1 is a spike in fetch request rate in the default selector case 
> (leader), that does not really require this high watermark propagation:
> !image-2020-03-17-10-19-08-987.png|width=811,height=354!
> This spike causes an increase in resource allocation (CPU) on the brokers.
> An easy solution would be to disable this propagation (at least) for the 
> default leader selector case to improve the backward compatibility.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9731) Increased fetch request rate with leader selector due to HW propagation

2020-05-05 Thread Ismael Juma (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9731?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ismael Juma updated KAFKA-9731:
---
Fix Version/s: 2.6.0

> Increased fetch request rate with leader selector due to HW propagation
> ---
>
> Key: KAFKA-9731
> URL: https://issues.apache.org/jira/browse/KAFKA-9731
> Project: Kafka
>  Issue Type: Improvement
>  Components: core
>Affects Versions: 2.4.0, 2.4.1
>Reporter: Vahid Hashemian
>Assignee: Ismael Juma
>Priority: Major
> Fix For: 2.6.0
>
> Attachments: image-2020-03-17-10-19-08-987.png
>
>
> KIP-392 adds high watermark propagation to followers as a means to better 
> sync up followers HW with leader. The issue we have noticed after trying out 
> 2.4.0 and 2.4.1 is a spike in fetch request rate in the default selector case 
> (leader), that does not really require this high watermark propagation:
> !image-2020-03-17-10-19-08-987.png|width=811,height=354!
> This spike causes an increase in resource allocation (CPU) on the brokers.
> An easy solution would be to disable this propagation (at least) for the 
> default leader selector case to improve the backward compatibility.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9957) Kafka Controller doesn't failover during hardware failure

2020-05-05 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099925#comment-17099925
 ] 

Ismael Juma commented on KAFKA-9957:


Looking at the thread dump, it seems like there are writes happening. For 
example:

 
{quote}"data-plane-kafka-request-handler-0" #51 daemon prio=5 os_prio=0 
tid=0x7fefce144000 nid=0x1a7 runnable 
[0x7fee9c6f]"data-plane-kafka-request-handler-0" #51 daemon prio=5 
os_prio=0 tid=0x7fefce144000 nid=0x1a7 runnable [0x7fee9c6f]   
java.lang.Thread.State: RUNNABLE at sun.nio.ch.FileDispatcherImpl.write0(Native 
Method) at sun.nio.ch.FileDispatcherImpl.write(FileDispatcherImpl.java:60) at 
sun.nio.ch.IOUtil.writeFromNativeBuffer(IOUtil.java:93) at 
sun.nio.ch.IOUtil.write(IOUtil.java:65) at 
sun.nio.ch.FileChannelImpl.write(FileChannelImpl.java:211) - locked 
<0xc9a0f258> (a java.lang.Object) at 
org.apache.kafka.common.record.MemoryRecords.writeFullyTo(MemoryRecords.java:88)
 at org.apache.kafka.common.record.FileRecords.append(FileRecords.java:167) at 
kafka.log.LogSegment.append(LogSegment.scala:158) at 
kafka.log.Log.$anonfun$append$2(Log.scala:1171) - locked <0xc9a0eef0> 
(a java.lang.Object) at kafka.log.Log.append(Log.scala:2322) at 
kafka.log.Log.appendAsLeader(Log.scala:1002) at 
kafka.cluster.Partition.$anonfun$appendRecordsToLeader$1(Partition.scala:969) 
at kafka.cluster.Partition.appendRecordsToLeader(Partition.scala:957) at 
kafka.server.ReplicaManager.$anonfun$appendToLocalLog$4(ReplicaManager.scala:794)
 at kafka.server.ReplicaManager$$Lambda$756/1564821644.apply(Unknown Source) at 
scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:238) at 
scala.collection.TraversableLike$$Lambda$15/875016237.apply(Unknown Source) at 
scala.collection.mutable.HashMap.$anonfun$foreach$1(HashMap.scala:149) at 
scala.collection.mutable.HashMap$$Lambda$25/1337335626.apply(Unknown Source) at 
scala.collection.mutable.HashTable.foreachEntry(HashTable.scala:237) at 
scala.collection.mutable.HashTable.foreachEntry$(HashTable.scala:230) at 
scala.collection.mutable.HashMap.foreachEntry(HashMap.scala:44) at 
scala.collection.mutable.HashMap.foreach(HashMap.scala:149) at 
scala.collection.TraversableLike.map(TraversableLike.scala:238) at 
scala.collection.TraversableLike.map$(TraversableLike.scala:231) at 
scala.collection.AbstractTraversable.map(Traversable.scala:108) at 
kafka.server.ReplicaManager.appendToLocalLog(ReplicaManager.scala:782) at 
kafka.server.ReplicaManager.appendRecords(ReplicaManager.scala:506) at 
kafka.server.KafkaApis.handleProduceRequest(KafkaApis.scala:582) at 
kafka.server.KafkaApis.handle(KafkaApis.scala:128)

 
{quote}
Unless the writes throw an exception, Kafka won't failover. Reading xfs freeze, 
it sounds like it blocks writes but doesn't necessarily fail them?

> Kafka Controller doesn't failover during hardware failure
> -
>
> Key: KAFKA-9957
> URL: https://issues.apache.org/jira/browse/KAFKA-9957
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.2.0, 2.5.0
>Reporter: Eric Ward
>Priority: Critical
> Attachments: kafka-threaddump.out
>
>
> On a couple different production environments we've run into an issue where a 
> hardware failure has hung up the controller and prevented controller and 
> topic leadership from changing to a healthy broker.  When the issue happens 
> we see this repeated in the logs at regular intervals for the other brokers 
> (the affected broker can’t write to disk, so no logging occurs there):
> {noformat}
> [2020-04-26 01:12:30,613] WARN [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={*snip*}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1962806970, 
> epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
>   at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>   at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:100)
>   at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:193)
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:280)
>   at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>   at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>   at scala.Option.foreach(Option.scala:274)
>   at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThr

[jira] [Commented] (KAFKA-9957) Kafka Controller doesn't failover during hardware failure

2020-05-05 Thread Eric Ward (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17099967#comment-17099967
 ] 

Eric Ward commented on KAFKA-9957:
--

That is correct.  xfs_freeze will not fail the writes, the writes will just 
hang indefinitely.

In the case of our production failures we saw similar behavior, which is what 
lead us to using xfs_freeze to reproduce.  Any attempt to interact with the 
affected filesystem would hang indefinitely, both reads and writes.

> Kafka Controller doesn't failover during hardware failure
> -
>
> Key: KAFKA-9957
> URL: https://issues.apache.org/jira/browse/KAFKA-9957
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.2.0, 2.5.0
>Reporter: Eric Ward
>Priority: Critical
> Attachments: kafka-threaddump.out
>
>
> On a couple different production environments we've run into an issue where a 
> hardware failure has hung up the controller and prevented controller and 
> topic leadership from changing to a healthy broker.  When the issue happens 
> we see this repeated in the logs at regular intervals for the other brokers 
> (the affected broker can’t write to disk, so no logging occurs there):
> {noformat}
> [2020-04-26 01:12:30,613] WARN [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={*snip*}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1962806970, 
> epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
>   at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>   at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:100)
>   at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:193)
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:280)
>   at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>   at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>   at scala.Option.foreach(Option.scala:274)
>   at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {noformat}
> This issue appears to be similar to KAFKA-7870, though that issue was 
> purportedly fixed by KAFKA-7697.
> Once we encounter this error any partitions whose leadership is on the 
> affected node are unavailable until we force that broker out of the cluster – 
> that is to say, kill the node.
> When we initially hit the issue we were running on version 2.2.0, though I've 
> been able to reproduce this in an environment running 2.5.0 as well. To 
> simulate the hardware failure I'm using the xfs_freeze utility to suspend 
> access to the filesystem.  Zookeeper failover is also part of the mix.  In 
> all instances where we’ve seen this the ZK leader and Kafka Controller were 
> on the same node and both affected by the hardware issue.  Zookeeper is able 
> to successfully failover, which it does rather quickly.
> Reproduction steps are pretty straightforward:
>  # Spin up a 3 node cluster
>  # Ensure that the Kafka Controller and Zookeeper Leader are on the same node.
>  # xfs_freeze the filesystem on the node that the controller is running on
> This reproduces 100% of the time for me.  I’ve left it running for well over 
> an hour without any Kafka failover happening.  Unfreezing the node will allow 
> the cluster to heal itself.
> I’ve attached a thread dump from an environment running 2.5.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bdbyrne commented on pull request #8615: KAFKA-9954: Config command didn't validate the unsupported user config change

2020-05-05 Thread GitBox


bdbyrne commented on pull request #8615:
URL: https://github.com/apache/kafka/pull/8615#issuecomment-624115331


   Hi Cheng - what error are you see when trying to modify the "users" quotas? 
This should be supported, and if not, there's likely another bug that's also 
affecting "clients".



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bdbyrne removed a comment on pull request #8615: KAFKA-9954: Config command didn't validate the unsupported user config change

2020-05-05 Thread GitBox


bdbyrne removed a comment on pull request #8615:
URL: https://github.com/apache/kafka/pull/8615#issuecomment-624115331


   Hi Cheng - what error are you see when trying to modify the "users" quotas? 
This should be supported, and if not, there's likely another bug that's also 
affecting "clients".



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9954) Config command didn't validate the unsupported user config change

2020-05-05 Thread Brian Byrne (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9954?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1702#comment-1702
 ] 

Brian Byrne commented on KAFKA-9954:


Hi Cheng - what error are you see when trying to modify the "users" quotas? 
This should be supported, and if not, there's likely another bug that's also 
affecting "clients".

> Config command didn't validate the unsupported user config change
> -
>
> Key: KAFKA-9954
> URL: https://issues.apache.org/jira/browse/KAFKA-9954
> Project: Kafka
>  Issue Type: Bug
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>
> {quote}bin/kafka-configs.sh --bootstrap-server localhost:9092 --alter 
> --add-config producer_byte_rate=4 --entity-type users --entity-default
> {quote}
>  
> will say that the alternation is complete. However, we don't support the 
> alternation yet.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] lbradstreet commented on pull request #8609: KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller

2020-05-05 Thread GitBox


lbradstreet commented on pull request #8609:
URL: https://github.com/apache/kafka/pull/8609#issuecomment-624122999


   @dajac thanks! Looks good to me.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9957) Kafka Controller doesn't failover during hardware failure

2020-05-05 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9957?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100018#comment-17100018
 ] 

Ismael Juma commented on KAFKA-9957:


Can you configure a timeout at the OS level? For example, see the following 
documentation from Amazon:
{quote}Most operating systems specify a timeout for I/O operations submitted to 
NVMe devices. The default timeout is 30 seconds and can be changed using the 
{{nvme_core.io_timeout}} boot parameter. With Linux kernels earlier than 
version 4.6, this parameter is {{nvme.io_timeout}}.
{quote}
[https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/nvme-ebs-volumes.html]

> Kafka Controller doesn't failover during hardware failure
> -
>
> Key: KAFKA-9957
> URL: https://issues.apache.org/jira/browse/KAFKA-9957
> Project: Kafka
>  Issue Type: Bug
>  Components: controller
>Affects Versions: 2.2.0, 2.5.0
>Reporter: Eric Ward
>Priority: Critical
> Attachments: kafka-threaddump.out
>
>
> On a couple different production environments we've run into an issue where a 
> hardware failure has hung up the controller and prevented controller and 
> topic leadership from changing to a healthy broker.  When the issue happens 
> we see this repeated in the logs at regular intervals for the other brokers 
> (the affected broker can’t write to disk, so no logging occurs there):
> {noformat}
> [2020-04-26 01:12:30,613] WARN [ReplicaFetcher replicaId=0, leaderId=2, 
> fetcherId=0] Error in response for fetch request (type=FetchRequest, 
> replicaId=0, maxWait=500, minBytes=1, maxBytes=10485760, fetchData={*snip*}, 
> isolationLevel=READ_UNCOMMITTED, toForget=, metadata=(sessionId=1962806970, 
> epoch=INITIAL)) (kafka.server.ReplicaFetcherThread)
> java.io.IOException: Connection to 2 was disconnected before the response was 
> read
>   at 
> org.apache.kafka.clients.NetworkClientUtils.sendAndReceive(NetworkClientUtils.java:100)
>   at 
> kafka.server.ReplicaFetcherBlockingSend.sendRequest(ReplicaFetcherBlockingSend.scala:100)
>   at 
> kafka.server.ReplicaFetcherThread.fetchFromLeader(ReplicaFetcherThread.scala:193)
>   at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:280)
>   at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3(AbstractFetcherThread.scala:132)
>   at 
> kafka.server.AbstractFetcherThread.$anonfun$maybeFetch$3$adapted(AbstractFetcherThread.scala:131)
>   at scala.Option.foreach(Option.scala:274)
>   at 
> kafka.server.AbstractFetcherThread.maybeFetch(AbstractFetcherThread.scala:131)
>   at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>   at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:82)
> {noformat}
> This issue appears to be similar to KAFKA-7870, though that issue was 
> purportedly fixed by KAFKA-7697.
> Once we encounter this error any partitions whose leadership is on the 
> affected node are unavailable until we force that broker out of the cluster – 
> that is to say, kill the node.
> When we initially hit the issue we were running on version 2.2.0, though I've 
> been able to reproduce this in an environment running 2.5.0 as well. To 
> simulate the hardware failure I'm using the xfs_freeze utility to suspend 
> access to the filesystem.  Zookeeper failover is also part of the mix.  In 
> all instances where we’ve seen this the ZK leader and Kafka Controller were 
> on the same node and both affected by the hardware issue.  Zookeeper is able 
> to successfully failover, which it does rather quickly.
> Reproduction steps are pretty straightforward:
>  # Spin up a 3 node cluster
>  # Ensure that the Kafka Controller and Zookeeper Leader are on the same node.
>  # xfs_freeze the filesystem on the node that the controller is running on
> This reproduces 100% of the time for me.  I’ve left it running for well over 
> an hour without any Kafka failover happening.  Unfreezing the node will allow 
> the cluster to heal itself.
> I’ve attached a thread dump from an environment running 2.5.0.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas

2020-05-05 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100020#comment-17100020
 ] 

Boyang Chen commented on KAFKA-9891:


We had some offline discussion, and plan to revise the integration test PR as 
well. We want to test out on trunk first, and then backport to 2.4 to ensure 
the regression doesn't carry over.

> Invalid state store content after task migration with exactly_once and 
> standby replicas
> ---
>
> Key: KAFKA-9891
> URL: https://issues.apache.org/jira/browse/KAFKA-9891
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Mateusz Jadczyk
>Assignee: Boyang Chen
>Priority: Blocker
>
> We have a simple command id deduplication mechanism (very similar to the one 
> from Kafka Streams examples) based on Kafka Streams State Stores. It stores 
> command ids from the past hour in _persistentWindowStore_. We encountered a 
> problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this 
> particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id 
> (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active 
> task 1_2. First node in the topology analyses if this is a duplicate by 
> checking in the state store (_COMMAND_ID_STORE_), if not puts the command id 
> in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id 
> (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the 
> duplicated command id is performed, it's not a duplicate, command id is put 
> into the state store. Next node in the topology throws an exception which 
> causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, 
> offsets are not committed. I double checked for the changelog topic - 
> relevant messages are not committed. Therefore, the changelog topic contains 
> only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and 
> not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated 
> _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local 
> _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. 
> It checks if this command id is a duplicate - no, it isn't - tries to process 
> the faulty command and throws an exception. Again, transaction aborted, all 
> looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, 
> *it is a duplicate!* Even though the transaction has been aborted and the 
> changelog doesn't contain this command id: 
> _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack 
> Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
>  we concluded it has something to do with checkpoint files. Here are the 
> detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> Processor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
> standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/Processor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] 
> o

[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas

2020-05-05 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100021#comment-17100021
 ] 

Boyang Chen commented on KAFKA-9891:


[~mateuszjadczyk] on the other hand, if you are willing to work on this ticket, 
feel free to take over, as I realize that you have better context of this 
problem in 2.4 than I do.

> Invalid state store content after task migration with exactly_once and 
> standby replicas
> ---
>
> Key: KAFKA-9891
> URL: https://issues.apache.org/jira/browse/KAFKA-9891
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Mateusz Jadczyk
>Assignee: Boyang Chen
>Priority: Blocker
>
> We have a simple command id deduplication mechanism (very similar to the one 
> from Kafka Streams examples) based on Kafka Streams State Stores. It stores 
> command ids from the past hour in _persistentWindowStore_. We encountered a 
> problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this 
> particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id 
> (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active 
> task 1_2. First node in the topology analyses if this is a duplicate by 
> checking in the state store (_COMMAND_ID_STORE_), if not puts the command id 
> in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id 
> (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the 
> duplicated command id is performed, it's not a duplicate, command id is put 
> into the state store. Next node in the topology throws an exception which 
> causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, 
> offsets are not committed. I double checked for the changelog topic - 
> relevant messages are not committed. Therefore, the changelog topic contains 
> only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and 
> not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated 
> _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local 
> _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. 
> It checks if this command id is a duplicate - no, it isn't - tries to process 
> the faulty command and throws an exception. Again, transaction aborted, all 
> looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, 
> *it is a duplicate!* Even though the transaction has been aborted and the 
> changelog doesn't contain this command id: 
> _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack 
> Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
>  we concluded it has something to do with checkpoint files. Here are the 
> detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> Processor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
> standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/Processor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.i

[GitHub] [kafka] dhruvilshah3 commented on a change in pull request #8609: KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller

2020-05-05 Thread GitBox


dhruvilshah3 commented on a change in pull request #8609:
URL: https://github.com/apache/kafka/pull/8609#discussion_r420238891



##
File path: core/src/main/scala/kafka/controller/ControllerChannelManager.scala
##
@@ -550,6 +550,22 @@ abstract class 
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
   else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 1
   else 0
 
+def responseCallback(brokerId: Int, isPartitionDeleted: TopicPartition => 
Boolean)
+(response: AbstractResponse): Unit = {
+  val stopReplicaResponse = response.asInstanceOf[StopReplicaResponse]
+  val partitionErrorsForDeletingTopics = mutable.Map.empty[TopicPartition, 
Errors]
+  stopReplicaResponse.partitionErrors.asScala.foreach { pe =>
+val tp = new TopicPartition(pe.topicName, pe.partitionIndex)
+if (controllerContext.isTopicDeletionInProgress(pe.topicName) &&
+isPartitionDeleted(tp)) {
+  partitionErrorsForDeletingTopics += tp -> 
Errors.forCode(pe.errorCode)

Review comment:
   nit: `partitionErrorsForDeletingTopics` seems a bit ambiguous and makes 
it sound like it only includes partitions for which StopReplicaRequest failed. 
Perhaps something like `partitionToError` is better?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-6468) Replication high watermark checkpoint file read for every LeaderAndIsrRequest

2020-05-05 Thread Kyle Ambroff-Kao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6468?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Kyle Ambroff-Kao resolved KAFKA-6468.
-
Resolution: Fixed

> Replication high watermark checkpoint file read for every LeaderAndIsrRequest
> -
>
> Key: KAFKA-6468
> URL: https://issues.apache.org/jira/browse/KAFKA-6468
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
>
> The high watermark for each partition in a given log directory is written to 
> disk every _replica.high.watermark.checkpoint.interval.ms_ milliseconds. This 
> checkpoint file is used to create replicas when joining the cluster.
> [https://github.com/apache/kafka/blob/b73c765d7e172de4742a3aa023d5a0a4b7387247/core/src/main/scala/kafka/cluster/Partition.scala#L180]
> Unfortunately this file is read every time 
> kafka.cluster.Partition#getOrCreateReplica is invoked. For most clusters this 
> isn't a big deal, but for a small cluster with lots of partitions all of the 
> reads of this file really add up.
> On my local test cluster of three brokers with around 40k partitions, the 
> initial LeaderAndIsrRequest refers to every partition in the cluster, and it 
> can take 20 to 30 minutes to create all of the replicas because the 
> _replication-offset-checkpoint_ is nearly 2MB.
> Changing this code so that we only read this file once on startup reduces the 
> time to create all replicas to around one minute.
> Credit to [~onurkaraman] for finding this one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-6468) Replication high watermark checkpoint file read for every LeaderAndIsrRequest

2020-05-05 Thread Kyle Ambroff-Kao (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6468?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100047#comment-17100047
 ] 

Kyle Ambroff-Kao commented on KAFKA-6468:
-

Linking as a duplicate of KAFKA-8333, which has a patch that fixes this problem.

> Replication high watermark checkpoint file read for every LeaderAndIsrRequest
> -
>
> Key: KAFKA-6468
> URL: https://issues.apache.org/jira/browse/KAFKA-6468
> Project: Kafka
>  Issue Type: Bug
>Reporter: Kyle Ambroff-Kao
>Assignee: Kyle Ambroff-Kao
>Priority: Major
>
> The high watermark for each partition in a given log directory is written to 
> disk every _replica.high.watermark.checkpoint.interval.ms_ milliseconds. This 
> checkpoint file is used to create replicas when joining the cluster.
> [https://github.com/apache/kafka/blob/b73c765d7e172de4742a3aa023d5a0a4b7387247/core/src/main/scala/kafka/cluster/Partition.scala#L180]
> Unfortunately this file is read every time 
> kafka.cluster.Partition#getOrCreateReplica is invoked. For most clusters this 
> isn't a big deal, but for a small cluster with lots of partitions all of the 
> reads of this file really add up.
> On my local test cluster of three brokers with around 40k partitions, the 
> initial LeaderAndIsrRequest refers to every partition in the cluster, and it 
> can take 20 to 30 minutes to create all of the replicas because the 
> _replication-offset-checkpoint_ is nearly 2MB.
> Changing this code so that we only read this file once on startup reduces the 
> time to create all replicas to around one minute.
> Credit to [~onurkaraman] for finding this one.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ambroff commented on pull request #4468: KAFKA-6468 Read replication-offset-checkpoint once

2020-05-05 Thread GitBox


ambroff commented on pull request #4468:
URL: https://github.com/apache/kafka/pull/4468#issuecomment-624160139


   Abandoning this PR since it looks like PR #6800 fixed this problem.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bseenu commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0

2020-05-05 Thread GitBox


bseenu commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r419913034



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##
@@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) {
 Checkpoint.unwrapGroup(record.sourcePartition()),
 System.currentTimeMillis() - record.timestamp());
 }
+
+private void refreshIdleConsumerGroupOffset() {
+Map> consumerGroupsDesc 
= targetAdminClient
+.describeConsumerGroups(consumerGroups).describedGroups();
+
+for (String group : consumerGroups) {
+try {
+ConsumerGroupDescription consumerGroupDesc = 
consumerGroupsDesc.get(group).get();
+ConsumerGroupState consumerGroupState = 
consumerGroupDesc.state();
+// sync offset to the target cluster only if the state of 
current consumer group is:
+// (1) idle: because the consumer at target is not actively 
consuming the mirrored topic
+// (2) dead: the new consumer that is recently created at 
source and never exist at target
+if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) {
+idleConsumerGroupsOffset.put(group, 
targetAdminClient.listConsumerGroupOffsets(group)
+.partitionsToOffsetAndMetadata().get().entrySet());
+} else if (consumerGroupState.equals(ConsumerGroupState.DEAD)) 
{
+newConsumerGroup.add(group);
+}
+} catch (InterruptedException | ExecutionException e) {
+log.error("Error querying for consumer group {} on cluster 
{}.", group, targetClusterAlias, e);
+}
+}
+}
+
+Map> syncGroupOffset() {
+Map> offsetToSyncAll = 
new HashMap<>();
+
+// first, sync offsets for the idle consumers at target
+for (Map.Entry>> group : idleConsumerGroupsOffset.entrySet()) {
+String consumerGroupId = group.getKey();
+// for each idle consumer at target, read the checkpoints 
(converted upstream offset)
+// from the pre-populated map
+Map convertedUpstreamOffset = 
getConvertedUpstreamOffset(consumerGroupId);
+
+if (convertedUpstreamOffset == null) continue;
+
+Map offsetToSync = new 
HashMap<>();
+for (Entry entry : 
group.getValue()) {
+long latestDownstreamOffset = entry.getValue().offset();
+TopicPartition topicPartition = entry.getKey();
+if (!convertedUpstreamOffset.containsKey(topicPartition)) {
+log.trace("convertedUpstreamOffset does not contain 
TopicPartition: {}", topicPartition.toString());
+continue;
+}
+
+// if translated offset from upstream is smaller than the 
current consumer offset
+// in the target, skip updating the offset for that partition
+long convertedOffset = 
convertedUpstreamOffset.get(topicPartition).offset();
+if (latestDownstreamOffset >= convertedOffset) {
+log.trace("latestDownstreamOffset {} is larger than 
convertedUpstreamOffset {} for "
++ "TopicPartition {}", latestDownstreamOffset, 
convertedOffset, topicPartition);
+continue;
+}

Review comment:
   I would like to propose the following changes to sync the consumer group 
changes on source side 
   
   ```suggestion
for (Map.Entry convertedEntry : 
convertedUpstreamOffset.entrySet()) {
   TopicPartition topicPartition = convertedEntry.getKey();
   for (Entry idleEntry : 
group.getValue()) {
   if (idleEntry.getKey() == topicPartition) {
   long latestDownstreamOffset = 
idleEntry.getValue().offset();
   // if translated offset from upstream is smaller 
than the current consumer offset
   // in the target, skip updating the offset for that 
partition
   long convertedOffset = 
convertedUpstreamOffset.get(topicPartition).offset();
   if (latestDownstreamOffset >= convertedOffset) {
   log.trace("latestDownstreamOffset {} is larger 
than convertedUpstreamOffset {} for "
   + "TopicPartition {}", 
latestDownstreamOffset, convertedOffset, topicPartition);
   continue;
   }
   }
   }
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For querie

[GitHub] [kafka] C0urante commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

2020-05-05 Thread GitBox


C0urante commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r420248205



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##
@@ -856,6 +857,49 @@ public void run() {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testSinkTasksHandleCloseErrors() throws Exception {
+createTask(initialState);
+expectInitializeTask();
+expectTaskGetTopic(true);
+
+// Put one message through the task to get some offsets to commit
+expectConsumerPoll(1);
+expectConversionAndTransformation(1);
+sinkTask.put(EasyMock.anyObject());
+PowerMock.expectLastCall().andVoid();
+
+// Throw an exception on the next put to trigger shutdown behavior
+// This exception is the true "cause" of the failure
+expectConsumerPoll(1);
+expectConversionAndTransformation(1);
+Throwable a = new RuntimeException();

Review comment:
   If we're going to refer to this later while making assertions, a more 
descriptive name might help readability. Something like `putFailure` here and 
`closeFailure` below, maybe?

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##
@@ -856,6 +857,49 @@ public void run() {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testSinkTasksHandleCloseErrors() throws Exception {
+createTask(initialState);
+expectInitializeTask();
+expectTaskGetTopic(true);
+
+// Put one message through the task to get some offsets to commit
+expectConsumerPoll(1);
+expectConversionAndTransformation(1);
+sinkTask.put(EasyMock.anyObject());
+PowerMock.expectLastCall().andVoid();
+
+// Throw an exception on the next put to trigger shutdown behavior
+// This exception is the true "cause" of the failure
+expectConsumerPoll(1);
+expectConversionAndTransformation(1);
+Throwable a = new RuntimeException();
+sinkTask.put(EasyMock.anyObject());
+PowerMock.expectLastCall().andThrow(a);
+
+// Throw another exception while closing the task's assignment
+EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
+.andStubReturn(Collections.emptyMap());
+Throwable b = new RuntimeException();
+sinkTask.close(EasyMock.anyObject());
+PowerMock.expectLastCall().andThrow(b);
+
+PowerMock.replayAll();
+
+workerTask.initialize(TASK_CONFIG);
+try {
+workerTask.execute();
+fail();
+} catch (Throwable t) {
+PowerMock.verifyAll();
+// The exception from close should not shadow the exception from 
put.

Review comment:
   Might be better to use this text as the message for the assertions 
instead of putting it here as a comment?

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/WorkerSinkTaskTest.java
##
@@ -856,6 +857,49 @@ public void run() {
 PowerMock.verifyAll();
 }
 
+@Test
+public void testSinkTasksHandleCloseErrors() throws Exception {
+createTask(initialState);
+expectInitializeTask();
+expectTaskGetTopic(true);
+
+// Put one message through the task to get some offsets to commit
+expectConsumerPoll(1);
+expectConversionAndTransformation(1);
+sinkTask.put(EasyMock.anyObject());
+PowerMock.expectLastCall().andVoid();
+
+// Throw an exception on the next put to trigger shutdown behavior
+// This exception is the true "cause" of the failure
+expectConsumerPoll(1);
+expectConversionAndTransformation(1);
+Throwable a = new RuntimeException();
+sinkTask.put(EasyMock.anyObject());
+PowerMock.expectLastCall().andThrow(a);
+
+// Throw another exception while closing the task's assignment
+EasyMock.expect(sinkTask.preCommit(EasyMock.anyObject()))
+.andStubReturn(Collections.emptyMap());
+Throwable b = new RuntimeException();
+sinkTask.close(EasyMock.anyObject());
+PowerMock.expectLastCall().andThrow(b);
+
+PowerMock.replayAll();
+
+workerTask.initialize(TASK_CONFIG);
+try {
+workerTask.execute();
+fail();
+} catch (Throwable t) {

Review comment:
   The call to `fail()` above is going to get caught here, isn't it? Think 
that might make this difficult to debug if a future change causes this to fail 
for some reason.
   
   Based on the `deliverMessages` method where `SinkTask::put` is invoked, it 
looks like we might be able to narrow this down to a `ConnectException`.





This is an automated message from the Apache Git Service.
To r

[GitHub] [kafka] bseenu commented on a change in pull request #7577: KAFKA-9076: support consumer offset sync across clusters in MM 2.0

2020-05-05 Thread GitBox


bseenu commented on a change in pull request #7577:
URL: https://github.com/apache/kafka/pull/7577#discussion_r420261901



##
File path: 
connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorCheckpointTask.java
##
@@ -190,4 +227,103 @@ public void commitRecord(SourceRecord record) {
 Checkpoint.unwrapGroup(record.sourcePartition()),
 System.currentTimeMillis() - record.timestamp());
 }
+
+private void refreshIdleConsumerGroupOffset() {
+Map> consumerGroupsDesc 
= targetAdminClient
+.describeConsumerGroups(consumerGroups).describedGroups();
+
+for (String group : consumerGroups) {
+try {
+ConsumerGroupDescription consumerGroupDesc = 
consumerGroupsDesc.get(group).get();
+ConsumerGroupState consumerGroupState = 
consumerGroupDesc.state();
+// sync offset to the target cluster only if the state of 
current consumer group is:
+// (1) idle: because the consumer at target is not actively 
consuming the mirrored topic
+// (2) dead: the new consumer that is recently created at 
source and never exist at target
+if (consumerGroupState.equals(ConsumerGroupState.EMPTY)) {
+idleConsumerGroupsOffset.put(group, 
targetAdminClient.listConsumerGroupOffsets(group)
+.partitionsToOffsetAndMetadata().get().entrySet());
+} else if (consumerGroupState.equals(ConsumerGroupState.DEAD)) 
{
+newConsumerGroup.add(group);
+}
+} catch (InterruptedException | ExecutionException e) {
+log.error("Error querying for consumer group {} on cluster 
{}.", group, targetClusterAlias, e);
+}
+}
+}
+
+Map> syncGroupOffset() {
+Map> offsetToSyncAll = 
new HashMap<>();
+
+// first, sync offsets for the idle consumers at target
+for (Map.Entry>> group : idleConsumerGroupsOffset.entrySet()) {
+String consumerGroupId = group.getKey();
+// for each idle consumer at target, read the checkpoints 
(converted upstream offset)
+// from the pre-populated map
+Map convertedUpstreamOffset = 
getConvertedUpstreamOffset(consumerGroupId);
+
+if (convertedUpstreamOffset == null) continue;
+
+Map offsetToSync = new 
HashMap<>();
+for (Entry entry : 
group.getValue()) {
+long latestDownstreamOffset = entry.getValue().offset();
+TopicPartition topicPartition = entry.getKey();
+if (!convertedUpstreamOffset.containsKey(topicPartition)) {
+log.trace("convertedUpstreamOffset does not contain 
TopicPartition: {}", topicPartition.toString());
+continue;
+}
+
+// if translated offset from upstream is smaller than the 
current consumer offset
+// in the target, skip updating the offset for that partition
+long convertedOffset = 
convertedUpstreamOffset.get(topicPartition).offset();
+if (latestDownstreamOffset >= convertedOffset) {
+log.trace("latestDownstreamOffset {} is larger than 
convertedUpstreamOffset {} for "
++ "TopicPartition {}", latestDownstreamOffset, 
convertedOffset, topicPartition);
+continue;
+}
+offsetToSync.put(entry.getKey(), 
convertedUpstreamOffset.get(topicPartition));

Review comment:
   I would like to propose the following change to take care of the source 
consumer group changes
   ```suggestion
for (Map.Entry 
convertedEntry : convertedUpstreamOffset.entrySet()) {
   TopicPartition topicPartition = convertedEntry.getKey();
   for (Entry idleEntry : 
group.getValue()) {
   if (idleEntry.getKey() == topicPartition) {
   long latestDownstreamOffset = 
idleEntry.getValue().offset();
   // if translated offset from upstream is smaller 
than the current consumer offset
   // in the target, skip updating the offset for that 
partition
   long convertedOffset = 
convertedUpstreamOffset.get(topicPartition).offset();
   if (latestDownstreamOffset >= convertedOffset) {
   log.trace("latestDownstreamOffset {} is larger 
than convertedUpstreamOffset {} for "
   + "TopicPartition {}", 
latestDownstreamOffset, convertedOffset, topicPartition);
   continue;
   }
   }
   }
   offsetToSync.put(convertedEntry.getKey(), 
convertedUpstreamOffset.get(topicPartition));
   ```




---

[GitHub] [kafka] ableegoldman commented on pull request #8496: KAFKA-9748: Add Streams eos-beta integration test

2020-05-05 Thread GitBox


ableegoldman commented on pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#issuecomment-624181714


   > Java 11: 
`org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]`
   @mjsax Should we use @ignore rather than merge known flaky tests while 
they're still under investigation? That's what we did for some of the KIP-441 
tests (granted they were failing at almost 100%, but still 🙂)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman edited a comment on pull request #8496: KAFKA-9748: Add Streams eos-beta integration test

2020-05-05 Thread GitBox


ableegoldman edited a comment on pull request #8496:
URL: https://github.com/apache/kafka/pull/8496#issuecomment-624181714


   > Java 11: 
`org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta[true]`
   
   @mjsax Should we use @ignore rather than merge known flaky tests while 
they're still under investigation? That's what we did for some of the KIP-441 
tests (granted they were failing at almost 100%, but still 🙂)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8609: KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller

2020-05-05 Thread GitBox


ijuma commented on pull request #8609:
URL: https://github.com/apache/kafka/pull/8609#issuecomment-624186102


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #8311: KAFKA-9434: automated protocol for alterReplicaLogDirs

2020-05-05 Thread GitBox


tombentley commented on pull request #8311:
URL: https://github.com/apache/kafka/pull/8311#issuecomment-624187386


   @dajac sure. I added tests for the create and delete topics cases too.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] gharris1727 commented on pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

2020-05-05 Thread GitBox


gharris1727 commented on pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#issuecomment-624196656


   @rhauch @kkonstantine This is ready for committer review. Thanks for taking 
a look!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9798) Flaky test: org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses

2020-05-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100107#comment-17100107
 ] 

Matthias J. Sax commented on KAFKA-9798:


Failed again: 
[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2127/testReport/junit/org.apache.kafka.streams.integration/QueryableStateIntegrationTest/shouldAllowConcurrentAccesses/]

> Flaky test: 
> org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses
> 
>
> Key: KAFKA-9798
> URL: https://issues.apache.org/jira/browse/KAFKA-9798
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Bill Bejeck
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: flaky-test, test
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9798) Flaky test: org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses

2020-05-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100107#comment-17100107
 ] 

Matthias J. Sax edited comment on KAFKA-9798 at 5/5/20, 5:31 PM:
-

Failed again: 
[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2127/testReport/junit/org.apache.kafka.streams.integration/QueryableStateIntegrationTest/shouldAllowConcurrentAccesses/]
{quote}java.lang.AssertionError: Did not receive all 1 records from topic 
output-concurrent-QueryableStateIntegrationTestshouldAllowConcurrentAccesses 
within 12 ms Expected: is a value equal to or greater than <1> but: <0> was 
less than <1> at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinValuesRecordsReceived$6(IntegrationTestUtils.java:747)
 at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429) 
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397) 
at 
org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinValuesRecordsReceived(IntegrationTestUtils.java:743)
 at 
org.apache.kafka.streams.integration.QueryableStateIntegrationTest.waitUntilAtLeastNumRecordProcessed(QueryableStateIntegrationTest.java:1159)
 at 
org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses(QueryableStateIntegrationTest.java:650){quote}


was (Author: mjsax):
Failed again: 
[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2127/testReport/junit/org.apache.kafka.streams.integration/QueryableStateIntegrationTest/shouldAllowConcurrentAccesses/]

> Flaky test: 
> org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses
> 
>
> Key: KAFKA-9798
> URL: https://issues.apache.org/jira/browse/KAFKA-9798
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Bill Bejeck
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: flaky-test, test
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9831) Failing test: EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]

2020-05-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9831?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100108#comment-17100108
 ] 

Matthias J. Sax commented on KAFKA-9831:


[https://builds.apache.org/job/kafka-pr-jdk8-scala2.12/2127/testReport/junit/org.apache.kafka.streams.integration/EosIntegrationTest/shouldNotViolateEosIfOneTaskFailsWithState_exactly_once_/]
{quote}java.lang.AssertionError: Expected: <[KeyValue(0, 0), KeyValue(0, 1), 
KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 10), KeyValue(0, 15), KeyValue(0, 
21), KeyValue(0, 28), KeyValue(0, 36), KeyValue(0, 45), KeyValue(0, 55), 
KeyValue(0, 66), KeyValue(0, 78), KeyValue(0, 91), KeyValue(0, 105)]> but: was 
<[KeyValue(0, 0), KeyValue(0, 1), KeyValue(0, 3), KeyValue(0, 6), KeyValue(0, 
10), KeyValue(0, 15), KeyValue(0, 21), KeyValue(0, 28), KeyValue(0, 36), 
KeyValue(0, 45), KeyValue(0, 55), KeyValue(0, 66), KeyValue(0, 78), KeyValue(0, 
91), KeyValue(0, 105), KeyValue(0, 55), KeyValue(0, 66)]> at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:6) at 
org.apache.kafka.streams.integration.EosIntegrationTest.checkResultPerKey(EosIntegrationTest.java:280)
 at 
org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState(EosIntegrationTest.java:481){quote}

> Failing test: 
> EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once_beta]
> --
>
> Key: KAFKA-9831
> URL: https://issues.apache.org/jira/browse/KAFKA-9831
> Project: Kafka
>  Issue Type: Bug
>  Components: streams, unit tests
>Reporter: John Roesler
>Assignee: Matthias J. Sax
>Priority: Major
> Attachments: one.stdout.txt, two.stdout.txt
>
>
> I've seen this fail twice in a row on the same build, but with different 
> errors. Stacktraces follow; stdout is attached.
> One:
> {noformat}
> java.lang.AssertionError: Did not receive all 40 records from topic 
> singlePartitionOutputTopic within 6 ms
> Expected: is a value equal to or greater than <40>
>  but: <39> was less than <40>
>   at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:517)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:415)
>   at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:383)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:513)
>   at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:491)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.readResult(EosIntegrationTest.java:766)
>   at 
> org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState(EosIntegrationTest.java:473)
>   at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.lang.reflect.Method.invoke(Method.java:498)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
> 

[jira] [Commented] (KAFKA-9949) Flaky Test GlobalKTableIntegrationTest#shouldKStreamGlobalKTableLeftJoin

2020-05-05 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9949?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100110#comment-17100110
 ] 

Matthias J. Sax commented on KAFKA-9949:


[https://builds.apache.org/job/kafka-pr-jdk11-scala2.13/6143/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableIntegrationTest/shouldKStreamGlobalKTableLeftJoin/]

> Flaky Test GlobalKTableIntegrationTest#shouldKStreamGlobalKTableLeftJoin
> 
>
> Key: KAFKA-9949
> URL: https://issues.apache.org/jira/browse/KAFKA-9949
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams, unit tests
>Reporter: Matthias J. Sax
>Priority: Critical
>  Labels: flaky-test
>
> [https://builds.apache.org/job/kafka-pr-jdk14-scala2.13/248/testReport/junit/org.apache.kafka.streams.integration/GlobalKTableIntegrationTest/shouldKStreamGlobalKTableLeftJoin/]
> {quote}java.lang.AssertionError: Condition not met within timeout 3. 
> waiting for final values at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26) at 
> org.apache.kafka.test.TestUtils.lambda$waitForCondition$5(TestUtils.java:381) 
> at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378) at 
> org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:368) at 
> org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableIntegrationTest.java:175){quote}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on pull request #8600: KAFKA-9928: Fix flaky GlobalKTableEOSIntegrationTest

2020-05-05 Thread GitBox


mjsax commented on pull request #8600:
URL: https://github.com/apache/kafka/pull/8600#issuecomment-624199715


   Java 14 passed.
   Java 8:
   ```
   
org.apache.kafka.streams.integration.EosIntegrationTest.shouldNotViolateEosIfOneTaskFailsWithState[exactly_once]
   
org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses
   ```
   Java 11: 
`org.apache.kafka.streams.integration.GlobalKTableIntegrationTest.shouldKStreamGlobalKTableLeftJoin`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ncliang commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

2020-05-05 Thread GitBox


ncliang commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r420288853



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##
@@ -193,13 +194,11 @@ public void transitionTo(TargetState state) {
 @Override
 public void execute() {
 initializeAndStart();
-try {
+// Make sure any uncommitted data has been committed and the task has
+// a chance to clean up its state
+try (QuietClosable ignored = this::closePartitions) {

Review comment:
   I am not sure I understand how the suppressed exception is logged and 
not just silently swallowed? Do we need to call ` getSuppressed` and log those 
somewhere or use one of those `closeQuietly` methods on `Utils`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8616: KAFKA-9127: don't create StreamThreads for global-only topology (2.4)

2020-05-05 Thread GitBox


vvcephei commented on pull request #8616:
URL: https://github.com/apache/kafka/pull/8616#issuecomment-624208471







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas

2020-05-05 Thread Mateusz Jadczyk (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100135#comment-17100135
 ] 

Mateusz Jadczyk commented on KAFKA-9891:


I don't think I have enough knowledge to propose a fix, I'm afraid. One thing 
is debugging, another one making it work :) 

> Invalid state store content after task migration with exactly_once and 
> standby replicas
> ---
>
> Key: KAFKA-9891
> URL: https://issues.apache.org/jira/browse/KAFKA-9891
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Mateusz Jadczyk
>Assignee: Boyang Chen
>Priority: Blocker
>
> We have a simple command id deduplication mechanism (very similar to the one 
> from Kafka Streams examples) based on Kafka Streams State Stores. It stores 
> command ids from the past hour in _persistentWindowStore_. We encountered a 
> problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this 
> particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id 
> (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active 
> task 1_2. First node in the topology analyses if this is a duplicate by 
> checking in the state store (_COMMAND_ID_STORE_), if not puts the command id 
> in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id 
> (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the 
> duplicated command id is performed, it's not a duplicate, command id is put 
> into the state store. Next node in the topology throws an exception which 
> causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, 
> offsets are not committed. I double checked for the changelog topic - 
> relevant messages are not committed. Therefore, the changelog topic contains 
> only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and 
> not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated 
> _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local 
> _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. 
> It checks if this command id is a duplicate - no, it isn't - tries to process 
> the faulty command and throws an exception. Again, transaction aborted, all 
> looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, 
> *it is a duplicate!* Even though the transaction has been aborted and the 
> changelog doesn't contain this command id: 
> _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack 
> Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
>  we concluded it has something to do with checkpoint files. Here are the 
> detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> Processor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
> standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/Processor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoin

[GitHub] [kafka] gharris1727 commented on a change in pull request #8618: KAFKA-9955: Prevent SinkTask::close from shadowing other exceptions

2020-05-05 Thread GitBox


gharris1727 commented on a change in pull request #8618:
URL: https://github.com/apache/kafka/pull/8618#discussion_r420298081



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSinkTask.java
##
@@ -193,13 +194,11 @@ public void transitionTo(TargetState state) {
 @Override
 public void execute() {
 initializeAndStart();
-try {
+// Make sure any uncommitted data has been committed and the task has
+// a chance to clean up its state
+try (QuietClosable ignored = this::closePartitions) {

Review comment:
   This PR is not changing any of the printing logic, and that's still 
handled by the caller, `WorkerTask::doRun`. This is roughly what a suppressed 
exception looks like when it gets logged (from the test setup, not a live 
connector):
   
   ```
   org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask due 
to unrecoverable exception.
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:569)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:327)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:229)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)

Suppressed: java.lang.RuntimeException
at 
org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:46)
at 
org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:101)
at 
org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:97)
at 
org.apache.kafka.connect.sink.SinkTask$$EnhancerByCGLIB$$713f645b.close()
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.commitOffsets(WorkerSinkTask.java:402)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.closePartitions(WorkerSinkTask.java:599)
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:202)
... 57 more
   Caused by: java.lang.RuntimeException
at 
org.easymock.internal.MockInvocationHandler.invoke(MockInvocationHandler.java:46)
at 
org.easymock.internal.ObjectMethodsFilter.invoke(ObjectMethodsFilter.java:101)
at 
org.easymock.internal.ClassProxyFactory$MockMethodInterceptor.intercept(ClassProxyFactory.java:68)
at 
org.apache.kafka.connect.sink.SinkTask$$EnhancerByCGLIB$$713f645b.put()
at 
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:547)
... 60 more
   ```
   Suppressed exceptions are a native Java feature, and log4j supports printing 
their stacktraces.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-4696) Streams standby task assignment should be state-store aware

2020-05-05 Thread Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4696?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sophie Blee-Goldman reassigned KAFKA-4696:
--

Fix Version/s: 2.6.0
 Assignee: Sophie Blee-Goldman  (was: Richard Yu)

Forgot about this ticket when submitting the PR, but this ended up being fixed 
via [https://github.com/apache/kafka/pull/8147]

> Streams standby task assignment should be state-store aware
> ---
>
> Key: KAFKA-4696
> URL: https://issues.apache.org/jira/browse/KAFKA-4696
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Affects Versions: 0.11.0.0, 0.10.2.0
>Reporter: Damian Guy
>Assignee: Sophie Blee-Goldman
>Priority: Major
> Fix For: 2.6.0
>
>
> Task Assignment is currently not aware of which tasks have State Stores. This 
> can result in uneven balance of standby task assignment as all tasks are 
> assigned, but only those tasks with state-stores are ever created by 
> {{StreamThread}}. So what seems like an optimal strategy during assignment 
> time could be sub-optimal post-assignment.
> For example, lets say we have 4 tasks (2 with state-stores), 2 clients, 
> numStandbyReplicas = 1. Each client would get 2 active and 2 standby tasks.  
> One of the clients may end up with both state-store tasks, while the other 
> has none.
> Further to this, standby task configuration is currently "all or nothing". It 
> might make sense to allow more fine grained configuration, i.e., the ability 
> to specify the number of standby replicas individually for each stateful 
> operator.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

2020-05-05 Thread GitBox


vvcephei commented on a change in pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#discussion_r420306701



##
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##
@@ -532,20 +532,22 @@ def do_rolling_bounce(self, processor, counter, 
current_generation):
 log_monitor.wait_until("Sent a version 8 subscription 
and got version 7 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version 7 and trigger new 
rebalance.",
timeout_sec=60,
err_msg="Could not detect 
'successful version probing' at upgrading node " + str(node.account))
-else:
-log_monitor.wait_until("Sent a version 8 subscription 
and got version 7 assignment back (successful version probing). Downgrade 
subscription metadata to commonly supported version 8 and trigger new 
rebalance.",
+log_monitor.wait_until("Detected that the assignor 
requested a rebalance. Rejoining the consumer group to trigger a new 
rebalance.",

Review comment:
   Thanks, all. This doesn't seem like the best way to verify what we're 
trying to verify, but it also seems about the same as before. I'm happy to 
leave this here for now.
   
   If/when the test breaks again, I'd prefer for us to put in a more reliable 
and direct mechanism.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

2020-05-05 Thread GitBox


vvcephei commented on pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#issuecomment-624218014


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8616: KAFKA-9127: don't create StreamThreads for global-only topology (2.4)

2020-05-05 Thread GitBox


vvcephei commented on pull request #8616:
URL: https://github.com/apache/kafka/pull/8616#issuecomment-624218222


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8616: KAFKA-9127: don't create StreamThreads for global-only topology (2.4)

2020-05-05 Thread GitBox


vvcephei commented on pull request #8616:
URL: https://github.com/apache/kafka/pull/8616#issuecomment-624218152


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9891) Invalid state store content after task migration with exactly_once and standby replicas

2020-05-05 Thread Boyang Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9891?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100156#comment-17100156
 ] 

Boyang Chen commented on KAFKA-9891:


[~mateuszjadczyk] Lol, no worry, thanks a lot for the insight you provided so 
far, will revise the integration test according to them.

> Invalid state store content after task migration with exactly_once and 
> standby replicas
> ---
>
> Key: KAFKA-9891
> URL: https://issues.apache.org/jira/browse/KAFKA-9891
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.3.1, 2.4.1
>Reporter: Mateusz Jadczyk
>Assignee: Boyang Chen
>Priority: Blocker
>
> We have a simple command id deduplication mechanism (very similar to the one 
> from Kafka Streams examples) based on Kafka Streams State Stores. It stores 
> command ids from the past hour in _persistentWindowStore_. We encountered a 
> problem with the store if there's an exception thrown later in that topology.
>  We run 3 nodes using docker, each with multiple threads set for this 
> particular Streams Application.
> The business flow is as follows (performed within a single subtopology):
>  *  a valid command is sent with command id 
> (_mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_). NODE 1 is running an active 
> task 1_2. First node in the topology analyses if this is a duplicate by 
> checking in the state store (_COMMAND_ID_STORE_), if not puts the command id 
> in the state store and processes the command properly.
>  * an invalid command is sent with the same key but new command id 
> (_mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc_). Again, check for the 
> duplicated command id is performed, it's not a duplicate, command id is put 
> into the state store. Next node in the topology throws an exception which 
> causes an error on NODE 1 for task 1_2. As a result, transaction is aborted, 
> offsets are not committed. I double checked for the changelog topic - 
> relevant messages are not committed. Therefore, the changelog topic contains 
> only the first command id _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f,_ and 
> not the one which caused a failure.
>  * in the meantime a standby task 1_2 running on NODE 3 replicated 
> _mnl_cmd_31bad7e5-35e7-490e-89f0-616fe967111f_ command id into a local 
> _COMMAND_ID_STORE_
>  * standby task 1_2 on NODE 3 Thread-2 takes over the task as an active one. 
> It checks if this command id is a duplicate - no, it isn't - tries to process 
> the faulty command and throws an exception. Again, transaction aborted, all 
> looks fine.
>  * NODE 3 Thread-1 takes over. It checks for the duplicate. To our surprise, 
> *it is a duplicate!* Even though the transaction has been aborted and the 
> changelog doesn't contain this command id: 
> _mnl_cmd_9f1752da-45b7-4ef7-9ef8-209d826530bc._
>  
> After digging into the Streams logs and some discussion on ([Stack 
> Overflow|https://stackoverflow.com/questions/61247789/invalid-state-store-content-after-aborted-transaction-with-exactly-once-and-stan])
>  we concluded it has something to do with checkpoint files. Here are the 
> detailed logs relevant to checkpoint files.
>  
> {code:java}
> NODE_3 2020-04-15 21:06:14.470 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:19.413 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-2] task 
> [1_2] Restoring state store COMMAND_ID_STORE from changelog topic 
> Processor-COMMAND_ID_STORE-changelog at checkpoint null
> NODE_3 2020-04-15 21:06:28.470 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.p.i.ProcessorStateManager : stream-thread 
> [Processor-94f7be8e-beec-411f-b4ec-9031527bccdf-StreamThread-1] 
> standby-task [1_2] Checkpointable offsets read from checkpoint: {}
> NODE_3 2020-04-15 21:06:29.634 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:06:29.640 TRACE 1 --- [-StreamThread-2] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp 
> /tmp/kafka-streams/Processor/1_2/.checkpoint
> NODE_3 2020-04-15 21:11:15.909 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Writing tmp checkpoint file 
> /tmp/kafka-streams/Processor/1_2/.checkpoint.tmp
> NODE_3 2020-04-15 21:11:15.912 TRACE 1 --- [-StreamThread-1] 
> o.a.k.s.s.internals.OffsetCheckpoint : Swapping tmp check

[GitHub] [kafka] hachikuji commented on pull request #8609: KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller

2020-05-05 Thread GitBox


hachikuji commented on pull request #8609:
URL: https://github.com/apache/kafka/pull/8609#issuecomment-624221761







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #8609: KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller

2020-05-05 Thread GitBox


hachikuji commented on pull request #8609:
URL: https://github.com/apache/kafka/pull/8609#issuecomment-62436


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bdbyrne commented on pull request #8610: KAKFA-9942: --entity-default flag is not working for alternating / describing configs in AdminClient

2020-05-05 Thread GitBox


bdbyrne commented on pull request #8610:
URL: https://github.com/apache/kafka/pull/8610#issuecomment-624225725


   So I believe the error to be here:
 
https://github.com/apache/kafka/pull/8610/files#diff-faf8cea6a3a0fab5e056ad5fee22ff3eR369-R375
   
   It should be translating the default entity type into `null`, but it's 
actually passing the name along as-is. The server-side logic should be correct.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bdbyrne edited a comment on pull request #8610: KAKFA-9942: --entity-default flag is not working for alternating / describing configs in AdminClient

2020-05-05 Thread GitBox


bdbyrne edited a comment on pull request #8610:
URL: https://github.com/apache/kafka/pull/8610#issuecomment-624225725


   So I believe the error to be here:
 
https://github.com/apache/kafka/pull/8610/files#diff-faf8cea6a3a0fab5e056ad5fee22ff3eR369-R375
   
   (In case the link doesn't work, it's where the ConfigCommand constructs the 
ClientQuotaEntity.)
   
   It should be translating the default entity type into `null`, but it's 
actually passing the name along as-is. The server-side logic should be correct.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8609: KAFKA-9946; StopReplicaRequest deletePartition changes may cause premature topic deletion handling in controller

2020-05-05 Thread GitBox


hachikuji commented on a change in pull request #8609:
URL: https://github.com/apache/kafka/pull/8609#discussion_r420319459



##
File path: core/src/main/scala/kafka/controller/ControllerChannelManager.scala
##
@@ -550,6 +550,22 @@ abstract class 
AbstractControllerBrokerRequestBatch(config: KafkaConfig,
   else if (config.interBrokerProtocolVersion >= KAFKA_2_2_IV0) 1
   else 0
 
+def responseCallback(brokerId: Int, isPartitionDeleted: TopicPartition => 
Boolean)
+(response: AbstractResponse): Unit = {
+  val stopReplicaResponse = response.asInstanceOf[StopReplicaResponse]
+  val partitionErrorsForDeletingTopics = mutable.Map.empty[TopicPartition, 
Errors]
+  stopReplicaResponse.partitionErrors.asScala.foreach { pe =>
+val tp = new TopicPartition(pe.topicName, pe.partitionIndex)
+if (controllerContext.isTopicDeletionInProgress(pe.topicName) &&
+isPartitionDeleted(tp)) {
+  partitionErrorsForDeletingTopics += tp -> 
Errors.forCode(pe.errorCode)

Review comment:
   I think the name seems ok. To me it means that the map includes the 
errors of all topic being deleted. It might be nice if it could reflect that 
this is only covering partitions which were also requested to be deleted in the 
StopReplica request, but that name probably becomes unwieldy.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8434: KAFKA-7599: Don't throttle in Trogdor when targetMessagesPerSec is 0

2020-05-05 Thread GitBox


ijuma commented on pull request #8434:
URL: https://github.com/apache/kafka/pull/8434#issuecomment-624230380


   `0` to mean `infinite` is a bit confusing. Maybe we can use `-1` instead?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9798) Flaky test: org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses

2020-05-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9798?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100179#comment-17100179
 ] 

Guozhang Wang commented on KAFKA-9798:
--

This is due to a known issue which is fixed in a hotfix --- let's see if it 
fails again from now on.

> Flaky test: 
> org.apache.kafka.streams.integration.QueryableStateIntegrationTest.shouldAllowConcurrentAccesses
> 
>
> Key: KAFKA-9798
> URL: https://issues.apache.org/jira/browse/KAFKA-9798
> Project: Kafka
>  Issue Type: Test
>  Components: streams, unit tests
>Reporter: Bill Bejeck
>Assignee: Guozhang Wang
>Priority: Major
>  Labels: flaky-test, test
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9909) Kafka Streams : offset control to Streams API

2020-05-05 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9909?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100186#comment-17100186
 ] 

Guozhang Wang commented on KAFKA-9909:
--

I think the reason here is that you catch the exception and continue, i.e. 
let's say you have three records {{a,b,c}} with offset {{1,2,3}} where {{b}} is 
ill-formatted:

1. {{a}} is processed normally, and then committed.
2. {{b}} found ill-formatted, and then skipped.
3. {{c}} is still processed normally, and then committed. Now we would commit 
up to {{3}} which would include {{b}}'s offset {{2}}.

Even in consumer, we do not support things like "offset 1 and 3 are committed, 
but offset 2 is skipped". I.e. if you do not want to commit offset {{2}}, you'd 
have to either send the record to a queue to bookkeep it, or just stop the app 
immediately and do not continue to process and commit {{c}}. You can read this 
section 
https://docs.confluent.io/current/streams/faq.html#failure-and-exception-handling
 for some more suggestions.



> Kafka Streams : offset control to Streams API
> -
>
> Key: KAFKA-9909
> URL: https://issues.apache.org/jira/browse/KAFKA-9909
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.5.0
> Environment: All
>Reporter: Gopikrishna
>Priority: Minor
>  Labels: Offset, commit
>
> Hello team, really inspired the way streams api is running today. I would 
> like to have a feature to be flexible regarding the offset. when we write the 
> processor api, processor context object can be used to commit the offset. 
> this is not effective. but streams are controlling the offset. the moment the 
> process method executed or scheduled window completed, the offset is 
> committed automatically by streams internally. 
> Like traditional kafka consumer, its better the context object should have 
> complete control over the offset whether to commit or not. This will give 
> more control to the api to handle failovers and especially when message 
> cannot be processed, context should not commit the offset. Appreciate this 
> can be implemented. 
>  
> h4. enable.auto.commit is by default false, but streams are committing 
> automatically the offset. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] scott-hendricks commented on pull request #8434: KAFKA-7599: Don't throttle in Trogdor when targetMessagesPerSec is 0

2020-05-05 Thread GitBox


scott-hendricks commented on pull request #8434:
URL: https://github.com/apache/kafka/pull/8434#issuecomment-624246201


   > `0` to mean `infinite` is a bit confusing. Maybe we can use `-1` instead?
   
   This is anything 0 or less though.  I would prefer 0 because our current 
clients do calculations before submitting the values and -1 would make this 
incredibly difficult.
   
   0 is not a valid input for producers in the current code, which is why we 
figured 0 would be OK here.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8589: KAFKA-9146: KIP-571 Add option to force delete active members in StreamsResetter

2020-05-05 Thread GitBox


abbccdda commented on a change in pull request #8589:
URL: https://github.com/apache/kafka/pull/8589#discussion_r418108095



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3621,24 +3641,37 @@ public RemoveMembersFromConsumerGroupResult 
removeMembersFromConsumerGroup(Strin
 KafkaFutureImpl> future = new 
KafkaFutureImpl<>();
 
 ConsumerGroupOperationContext, 
RemoveMembersFromConsumerGroupOptions> context =
-new ConsumerGroupOperationContext<>(groupId, options, deadline, 
future);
+new ConsumerGroupOperationContext<>(groupId, options, 
deadline, future);
 
-Call findCoordinatorCall = getFindCoordinatorCall(context,
-() -> getRemoveMembersFromGroupCall(context));
+Call findCoordinatorCall;
+if (options.removeAll()) {
+List members = getMembersFromGroup(groupId);
+findCoordinatorCall = getFindCoordinatorCall(context,
+() -> getRemoveMembersFromGroupCall(context, members));

Review comment:
   could we pass the members into the context?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/RemoveMembersFromConsumerGroupOptions.java
##
@@ -32,12 +32,23 @@
 public class RemoveMembersFromConsumerGroupOptions extends 
AbstractOptions {
 
 private Set members;

Review comment:
   Could we just make members to be `Optional>` so that 
we don't need a separate removeAll parameter?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3612,6 +3611,27 @@ private boolean dependsOnSpecificNode(ConfigResource 
resource) {
 || resource.type() == ConfigResource.Type.BROKER_LOGGER;
 }
 
+private List getMembersFromGroup(String groupId) {
+Collection members = new ArrayList<>();
+try {
+members = 
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
+} catch (Throwable ex) {
+System.out.println("Encounter exception when trying to get members 
from group: " + groupId);
+ex.printStackTrace();
+}
+
+List memberToRemove = new ArrayList<>();
+for (MemberDescription member: members) {

Review comment:
   style error here.
   
   I would recommend doing a self style check like:
   `./gradlew checkstyleMain checkstyleTest spotbugsMain spotbugsTest 
spotbugsScoverage compileTestJava` otherwise we still need to fix those 
failures after we do jenkins build.

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3612,6 +3611,27 @@ private boolean dependsOnSpecificNode(ConfigResource 
resource) {
 || resource.type() == ConfigResource.Type.BROKER_LOGGER;
 }
 
+private List getMembersFromGroup(String groupId) {
+Collection members = new ArrayList<>();
+try {
+members = 
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
+} catch (Throwable ex) {
+System.out.println("Encounter exception when trying to get members 
from group: " + groupId);

Review comment:
   Remove print statements

##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -3612,6 +3611,27 @@ private boolean dependsOnSpecificNode(ConfigResource 
resource) {
 || resource.type() == ConfigResource.Type.BROKER_LOGGER;
 }
 
+private List getMembersFromGroup(String groupId) {
+Collection members = new ArrayList<>();
+try {
+members = 
describeConsumerGroups(Collections.singleton(groupId)).describedGroups().get(groupId).get().members();
+} catch (Throwable ex) {
+System.out.println("Encounter exception when trying to get members 
from group: " + groupId);
+ex.printStackTrace();

Review comment:
   Curious why we are still continuing in this case, as the member lookup 
already fails.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax opened a new pull request #8619: MINOR: Improve TopologyTestDriver JavaDocs

2020-05-05 Thread GitBox


mjsax opened a new pull request #8619:
URL: https://github.com/apache/kafka/pull/8619


   Call for review @bbejeck 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

2020-05-05 Thread GitBox


cadonna commented on pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#issuecomment-624258204


   Just in case, I re-run the system tests: 
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/3929/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8613: KAFKA-6145: Set HighAvailabilityTaskAssignor as default in streams_upgrade_test.py

2020-05-05 Thread GitBox


vvcephei commented on pull request #8613:
URL: https://github.com/apache/kafka/pull/8613#issuecomment-624270635


   Thanks @cadonna , Let's see how those tests play out.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8616: KAFKA-9127: don't create StreamThreads for global-only topology (2.4)

2020-05-05 Thread GitBox


vvcephei commented on pull request #8616:
URL: https://github.com/apache/kafka/pull/8616#issuecomment-624270869


   test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8603: MINOR: Fix ProcessorContext JavaDocs

2020-05-05 Thread GitBox


vvcephei commented on a change in pull request #8603:
URL: https://github.com/apache/kafka/pull/8603#discussion_r420377361



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##
@@ -209,49 +211,52 @@ Cancellable schedule(final Duration interval,
  void forward(final K1 key, final V1 value, 
final String childName);
 
 /**
- * Requests a commit
+ * Requests a commit.
  */
 void commit();
 
 /**
  * Returns the topic name of the current input record; could be null if it 
is not
- * available (for example, if this method is invoked from the punctuate 
call)
+ * available (for example, if this method is invoked from the punctuate 
call).
  *
  * @return the topic name
  */
 String topic();
 
 /**
  * Returns the partition id of the current input record; could be -1 if it 
is not
- * available (for example, if this method is invoked from the punctuate 
call)
+ * available (for example, if this method is invoked from the punctuate 
call).
  *
  * @return the partition id
  */
 int partition();
 
 /**
  * Returns the offset of the current input record; could be -1 if it is not
- * available (for example, if this method is invoked from the punctuate 
call)
+ * available (for example, if this method is invoked from the punctuate 
call).
  *
  * @return the offset
  */
 long offset();
 
 /**
- * Returns the headers of the current input record; could be null if it is 
not available
+ * Returns the headers of the current input record; could be null if it is 
not
+ * available (for example, if this method is invoked from the punctuate 
call).
+ *
  * @return the headers
  */
 Headers headers();
 
 /**
  * Returns the current timestamp.
  *
- * If it is triggered while processing a record streamed from the source 
processor, timestamp is defined as the timestamp of the current input record; 
the timestamp is extracted from
+ *  If it is triggered while processing a record streamed from the 
source processor,
+ * timestamp is defined as the timestamp of the current input record; the 
timestamp is extracted from
  * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} 
by {@link TimestampExtractor}.
  *
- * If it is triggered while processing a record generated not from the 
source processor (for example,
+ *  If it is triggered while processing a record generated not from the 
source processor (for example,
  * if this method is invoked from the punctuate call), timestamp is 
defined as the current
- * task's stream time, which is defined as the smallest among all its 
input stream partition timestamps.
+ * task's stream time, which is defined as the largest among all its input 
stream partition timestamps.

Review comment:
   I just took another look at the definition of streamTime, and it 
actually looks like it might be computed wrongly.
   
   The way it works is that the "stream time" for a task is computed most of 
the time in 
`org.apache.kafka.streams.processor.internals.PartitionGroup#nextRecord`, i.e., 
it's the max timestamp of any record _polled from the PartitionGroup_.
   
   However, when we commit, we commit the "partition time" for each 
TopicPartition, which is set when we move a record into the head position for 
that queue. During restoration, we read these committed timestamps for each 
TopicPartition, and we (incorrectly) set the "stream time" to be the maximum 
over the "partition time" of each partition in the PartitionGroup (aka Task).
   
   This is incorrect in two ways:
   1. it should be the minimum, not the maximum (since we would choose the 
record with the minimum timestamp to process next)
   2. the timestamp of the _head enqueued_ record (partition time) is not the 
timestamp of the _last dequeued_ record (stream time).
   
   I'll file a Jira ticket capturing all this. In the mean time, I'd suggest 
that we just update the docs to reflect the correct definition of "stream 
time": `which is defined as the largest timestamp of any record processed by 
the task`. Then, we can fix the code to make this true all the time. Currently, 
it's only true in steady state, not immediately after restoration.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-9958) Add a new method to AuthorizerServerInfo Interface

2020-05-05 Thread Jeff Huang (Jira)
Jeff Huang created KAFKA-9958:
-

 Summary: Add a new method to AuthorizerServerInfo Interface
 Key: KAFKA-9958
 URL: https://issues.apache.org/jira/browse/KAFKA-9958
 Project: Kafka
  Issue Type: Improvement
  Components: core
Reporter: Jeff Huang
 Fix For: 2.6.0


More details here:

[https://cwiki.apache.org/confluence/display/KAFKA/KIP-608+-+Add+a+new+method+to+AuthorizerServerInfo+Interface]

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #8600: KAFKA-9928: Fix flaky GlobalKTableEOSIntegrationTest

2020-05-05 Thread GitBox


guozhangwang commented on pull request #8600:
URL: https://github.com/apache/kafka/pull/8600#issuecomment-624284868


   I still see the following issue locally:
   
   ```
   java.lang.AssertionError: Condition not met within timeout 3. waiting 
for final values
 expected: {a=1+F, b=2+G, c=3+H, d=4+I, e=5+J}
 received: {a=1+A, b=2+G, c=3+H, d=4+I, e=5+J}
   
at org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:26)
at 
org.apache.kafka.test.TestUtils.lambda$waitForCondition$17(TestUtils.java:381)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
at 
org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
at org.apache.kafka.test.TestUtils.waitForCondition(TestUtils.java:378)
at 
org.apache.kafka.streams.integration.GlobalKTableEOSIntegrationTest.shouldKStreamGlobalKTableLeftJoin(GlobalKTableEOSIntegrationTest.java:205)
   ```
   
   In addition, sometimes the test will hang as well (i.e. the above 
verification would not fail, the test just runs forever); I tried using 
different assignor via `INTERNAL_TASK_ASSIGNOR_CLASS` but the same hanging 
issue still exists.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7472) Implement KIP-145 transformations

2020-05-05 Thread Loic DIVAD (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7472?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100228#comment-17100228
 ] 

Loic DIVAD commented on KAFKA-7472:
---

Thank you [~rhauch]! :)

> Implement KIP-145 transformations 
> --
>
> Key: KAFKA-7472
> URL: https://issues.apache.org/jira/browse/KAFKA-7472
> Project: Kafka
>  Issue Type: New Feature
>  Components: KafkaConnect
>Affects Versions: 1.1.0
>Reporter: Randall Hauch
>Assignee: Loic DIVAD
>Priority: Critical
>
> As part of 
> [KIP-145|https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect],
>  several SMTs were described and approved. However, they were never 
> implemented.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] d8tltanc commented on pull request #8610: KAKFA-9942: --entity-default flag is not working for alternating / describing configs in AdminClient

2020-05-05 Thread GitBox


d8tltanc commented on pull request #8610:
URL: https://github.com/apache/kafka/pull/8610#issuecomment-624308593


   Hi @bdbyrne , thanks for the comment. The link seems not working. I guess 
you mean this part we should replace the empty string "" by null?
   `private[admin] def entityNames(): List[String] = {
 val namesIterator = options.valuesOf(entityName).iterator
 options.specs.asScala
   .filter(spec => spec.options.contains("entity-name") || 
spec.options.contains("entity-default"))
   .map(spec => if (spec.options.contains("entity-name")) 
namesIterator.next else "").toList ++
 entityFlags
   .filter(entity => options.has(entity._1))
   .map(entity => options.valueOf(entity._1)) ++
 entityDefaultsFlags
   .filter(entity => options.has(entity._1))
   .map(_ => "")
   }`



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc edited a comment on pull request #8610: KAKFA-9942: --entity-default flag is not working for alternating / describing configs in AdminClient

2020-05-05 Thread GitBox


d8tltanc edited a comment on pull request #8610:
URL: https://github.com/apache/kafka/pull/8610#issuecomment-624308593


   Hi @bdbyrne , thanks for the comment. The link seems not working. I guess 
you mean this part we should replace the empty string "" by null?
   private[admin] def entityNames(): List[String] = {
 val namesIterator = options.valuesOf(entityName).iterator
 options.specs.asScala
   .filter(spec => spec.options.contains("entity-name") || 
spec.options.contains("entity-default"))
   .map(spec => if (spec.options.contains("entity-name")) 
namesIterator.next else "").toList ++
 entityFlags
   .filter(entity => options.has(entity._1))
   .map(entity => options.valueOf(entity._1)) ++
 entityDefaultsFlags
   .filter(entity => options.has(entity._1))
   .map(_ => "")
   }



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] d8tltanc edited a comment on pull request #8610: KAKFA-9942: --entity-default flag is not working for alternating / describing configs in AdminClient

2020-05-05 Thread GitBox


d8tltanc edited a comment on pull request #8610:
URL: https://github.com/apache/kafka/pull/8610#issuecomment-624308593


   Hi @bdbyrne , thanks for the comment. The link seems not working. I guess 
you mean this part we should replace the empty string "" by null?
   
   > private[admin] def entityNames(): List[String] = {
   >   val namesIterator = options.valuesOf(entityName).iterator
   >   options.specs.asScala
   > .filter(spec => spec.options.contains("entity-name") || 
spec.options.contains("entity-default"))
   > .map(spec => if (spec.options.contains("entity-name")) 
namesIterator.next else "").toList ++
   >   entityFlags
   > .filter(entity => options.has(entity._1))
   > .map(entity => options.valueOf(entity._1)) ++
   >   entityDefaultsFlags
   > .filter(entity => options.has(entity._1))
   > .map(_ => "")
   > }



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #8294: KAFKA-9718; Don't log passwords for AlterConfigs in request logs

2020-05-05 Thread GitBox


cmccabe commented on pull request #8294:
URL: https://github.com/apache/kafka/pull/8294#issuecomment-624309696


   backported to 2.5.x



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #8260: KAFKA-9625: Fixing IncrementalAlterConfigs with respect to Broker Configs

2020-05-05 Thread GitBox


cmccabe commented on pull request #8260:
URL: https://github.com/apache/kafka/pull/8260#issuecomment-624312027


   backported to 2.5.x



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9718) Don't log passwords for AlterConfigs requests in request logs

2020-05-05 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9718?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-9718:

Fix Version/s: 2.5.1

> Don't log passwords for AlterConfigs requests in request logs
> -
>
> Key: KAFKA-9718
> URL: https://issues.apache.org/jira/browse/KAFKA-9718
> Project: Kafka
>  Issue Type: Bug
>Reporter: Rajini Sivaram
>Assignee: Rajini Sivaram
>Priority: Major
> Fix For: 2.6.0, 2.5.1
>
>
> We currently avoid logging passwords in log files by logging only parsed 
> values were passwords are logged as `[hidden]`. But for AlterConfigs requests 
> in request logs, we log all entries since they just appear as string entries. 
> Since we allow altering password configs like SSL key passwords and JAAS 
> config, we shouldn't include these in log files.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9625) Unable to Describe broker configurations that have been set via IncrementalAlterConfigs

2020-05-05 Thread Colin McCabe (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9625?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Colin McCabe updated KAFKA-9625:

Fix Version/s: 2.5.1

> Unable to Describe broker configurations that have been set via 
> IncrementalAlterConfigs
> ---
>
> Key: KAFKA-9625
> URL: https://issues.apache.org/jira/browse/KAFKA-9625
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Reporter: Colin McCabe
>Assignee: Sanjana Kaundinya
>Priority: Critical
> Fix For: 2.6.0, 2.5.1
>
>
> There seem to be at least two bugs in the broker configuration APIs and/or 
> logic:
> 1. Broker throttles are incorrectly marked as sensitive configurations.  This 
> includes leader.replication.throttled.rate, 
> follower.replication.throttled.rate, 
> replica.alter.log.dirs.io.max.bytes.per.second.  This means that their values 
> cannot be read back by DescribeConfigs after they are set.
> 2. When we clear the broker throttles via incrementalAlterConfigs, 
> DescribeConfigs continues returning the old throttles indefinitely.  In other 
> words, the clearing is not reflected in the Describe API.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bdbyrne commented on pull request #8610: KAKFA-9942: --entity-default flag is not working for alternating / describing configs in AdminClient

2020-05-05 Thread GitBox


bdbyrne commented on pull request #8610:
URL: https://github.com/apache/kafka/pull/8610#issuecomment-624318293


   > Hi @bdbyrne , thanks for the comment. The link seems not working. I guess 
you mean this part we should replace the empty string "" by null?
   > 
   > > ```
   > > private[admin] def entityNames(): List[String] = {
   > >   val namesIterator = options.valuesOf(entityName).iterator
   > >   options.specs.asScala
   > > .filter(spec => spec.options.contains("entity-name") || 
spec.options.contains("entity-default"))
   > > .map(spec => if (spec.options.contains("entity-name")) 
namesIterator.next else "").toList ++
   > >   entityFlags
   > > .filter(entity => options.has(entity._1))
   > > .map(entity => options.valueOf(entity._1)) ++
   > >   entityDefaultsFlags
   > > .filter(entity => options.has(entity._1))
   > > .map(_ => "")
   > > }
   > > ```
   
   That's correct, yes. You could do it in the provided code, however keep in 
mind that the ZK path currently uses it as well and would expect the empty 
string. The alternative is to do the substitution (`"" -> null`) when creating 
the `ClientQuotaEntity`, which is exclusive to the `AdminClient` path.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bdbyrne commented on pull request #8610: KAKFA-9942: --entity-default flag is not working for alternating / describing configs in AdminClient

2020-05-05 Thread GitBox


bdbyrne commented on pull request #8610:
URL: https://github.com/apache/kafka/pull/8610#issuecomment-624318905


   > > Hi @bdbyrne , thanks for the comment. The link seems not working. I 
guess you mean this part we should replace the empty string "" by null?
   > > > ```
   > > > private[admin] def entityNames(): List[String] = {
   > > >   val namesIterator = options.valuesOf(entityName).iterator
   > > >   options.specs.asScala
   > > > .filter(spec => spec.options.contains("entity-name") || 
spec.options.contains("entity-default"))
   > > > .map(spec => if (spec.options.contains("entity-name")) 
namesIterator.next else "").toList ++
   > > >   entityFlags
   > > > .filter(entity => options.has(entity._1))
   > > > .map(entity => options.valueOf(entity._1)) ++
   > > >   entityDefaultsFlags
   > > > .filter(entity => options.has(entity._1))
   > > > .map(_ => "")
   > > > }
   > > > ```
   > 
   > That's correct, yes. You could do it in the provided code, however keep in 
mind that the ZK path currently uses it as well and would expect the empty 
string. The alternative is to do the substitution (`"" -> null`) when creating 
the `ClientQuotaEntity`, which is exclusive to the `AdminClient` path.
   
   Oh wait, there's also other places where the default is used and expected to 
be the empty string (broker config), so it may be best to only make the change 
at the `ClientQuotaEntity`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ambroff commented on a change in pull request #4540: KAFKA-6469 Batch ISR change notifications

2020-05-05 Thread GitBox


ambroff commented on a change in pull request #4540:
URL: https://github.com/apache/kafka/pull/4540#discussion_r420431031



##
File path: core/src/test/scala/integration/kafka/server/ReplicaManagerTest.scala
##
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.server
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.MockTime
+import org.junit.Test
+import org.junit.Assert._
+
+class ReplicaManagerTest extends ZooKeeperTestHarness {

Review comment:
   Good point I moved the test to the existing `ReplicaManagerTest`.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ambroff commented on a change in pull request #4540: KAFKA-6469 Batch ISR change notifications

2020-05-05 Thread GitBox


ambroff commented on a change in pull request #4540:
URL: https://github.com/apache/kafka/pull/4540#discussion_r420430926



##
File path: core/src/test/scala/integration/kafka/server/ReplicaManagerTest.scala
##
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.server
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.MockTime
+import org.junit.Test
+import org.junit.Assert._
+
+class ReplicaManagerTest extends ZooKeeperTestHarness {
+  @Test
+  def testMaybePropagateIsrChanges(): Unit = {
+val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
zkConnect))
+val mockTime = new MockTime(1)
+val server = TestUtils.createServer(config, mockTime)
+
+// Topic name which is the maximum length of 249.
+val largeTopicName = ("topic-" padTo(249, "x")).mkString
+
+// 5000 partitions. Large partition numbers chosen to make the serialized 
values as long as possible.
+(1 to 15000)
+  .map(n => new TopicPartition(largeTopicName, n))
+  .foreach(server.replicaManager.recordIsrChange)
+
+server.replicaManager.maybePropagateIsrChanges()
+
+val isrChangeNotificationQueuePath = "/isr_change_notification"
+val node0 = "isr_change_00"
+val node1 = "isr_change_01"
+
+assertEquals(
+  zkClient.getChildren(isrChangeNotificationQueuePath).toSet,
+  Set(node0, node1))
+
+val (_, stat0) = zkClient.getDataAndStat(isrChangeNotificationQueuePath + 
"/" + node0)
+val (_, stat1) = zkClient.getDataAndStat(isrChangeNotificationQueuePath + 
"/" + node1)
+
+assertEquals(840028, stat0.getDataLength)
+assertEquals(560308, stat1.getDataLength)

Review comment:
   Good point I moved the test to the existing `ReplicaManagerTest`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ambroff commented on a change in pull request #4540: KAFKA-6469 Batch ISR change notifications

2020-05-05 Thread GitBox


ambroff commented on a change in pull request #4540:
URL: https://github.com/apache/kafka/pull/4540#discussion_r420430926



##
File path: core/src/test/scala/integration/kafka/server/ReplicaManagerTest.scala
##
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.server
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.MockTime
+import org.junit.Test
+import org.junit.Assert._
+
+class ReplicaManagerTest extends ZooKeeperTestHarness {
+  @Test
+  def testMaybePropagateIsrChanges(): Unit = {
+val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
zkConnect))
+val mockTime = new MockTime(1)
+val server = TestUtils.createServer(config, mockTime)
+
+// Topic name which is the maximum length of 249.
+val largeTopicName = ("topic-" padTo(249, "x")).mkString
+
+// 5000 partitions. Large partition numbers chosen to make the serialized 
values as long as possible.
+(1 to 15000)
+  .map(n => new TopicPartition(largeTopicName, n))
+  .foreach(server.replicaManager.recordIsrChange)
+
+server.replicaManager.maybePropagateIsrChanges()
+
+val isrChangeNotificationQueuePath = "/isr_change_notification"
+val node0 = "isr_change_00"
+val node1 = "isr_change_01"
+
+assertEquals(
+  zkClient.getChildren(isrChangeNotificationQueuePath).toSet,
+  Set(node0, node1))
+
+val (_, stat0) = zkClient.getDataAndStat(isrChangeNotificationQueuePath + 
"/" + node0)
+val (_, stat1) = zkClient.getDataAndStat(isrChangeNotificationQueuePath + 
"/" + node1)
+
+assertEquals(840028, stat0.getDataLength)
+assertEquals(560308, stat1.getDataLength)

Review comment:
   I just made it a threshold instead. Assert the serialized size is always 
< 900KiB.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ambroff commented on a change in pull request #4540: KAFKA-6469 Batch ISR change notifications

2020-05-05 Thread GitBox


ambroff commented on a change in pull request #4540:
URL: https://github.com/apache/kafka/pull/4540#discussion_r420434068



##
File path: core/src/test/scala/integration/kafka/server/ReplicaManagerTest.scala
##
@@ -0,0 +1,61 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package integration.kafka.server
+
+import kafka.server.KafkaConfig
+import kafka.utils.TestUtils
+import kafka.zk.ZooKeeperTestHarness
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.utils.MockTime
+import org.junit.Test
+import org.junit.Assert._
+
+class ReplicaManagerTest extends ZooKeeperTestHarness {
+  @Test
+  def testMaybePropagateIsrChanges(): Unit = {
+val config = KafkaConfig.fromProps(TestUtils.createBrokerConfig(1, 
zkConnect))
+val mockTime = new MockTime(1)

Review comment:
   maybePropagateIsrChanges() won't do anything unless 
ReplicaManager.IsrChangePropagationInterval milliseconds have elapsed since the 
last call to recordIsrChange.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-9959) leastLoadedNode() does not provide a node fairly

2020-05-05 Thread Cheng Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan reassigned KAFKA-9959:


Assignee: Cheng Tan

> leastLoadedNode() does not provide a node fairly
> 
>
> Key: KAFKA-9959
> URL: https://issues.apache.org/jira/browse/KAFKA-9959
> Project: Kafka
>  Issue Type: Bug
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9959) leastLoadedNode() does not provide a node fairly

2020-05-05 Thread Cheng Tan (Jira)
Cheng Tan created KAFKA-9959:


 Summary: leastLoadedNode() does not provide a node fairly
 Key: KAFKA-9959
 URL: https://issues.apache.org/jira/browse/KAFKA-9959
 Project: Kafka
  Issue Type: Bug
Reporter: Cheng Tan






--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9929) Support reverse iterator on WindowStore

2020-05-05 Thread Jorge Esteban Quilcate Otoya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9929?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17100291#comment-17100291
 ] 

Jorge Esteban Quilcate Otoya commented on KAFKA-9929:
-

Thanks [~guozhang].

Looking into o.a.k.s.state.internals, I've found Iterators based on 
RocksIterator (e.g. SingleColumnFamilyAccessor) and TreeMap (e.g. 
MemoryNavigableLRUCache). After a quick review both APIs support reverse query:

 
{code:java}
 final RocksIterator iter = db.newIterator();
 iter.seekToFirst();
 iter.next();

 final RocksIterator reverse = db.newIterator();
 reverse.seekToLast();
 reverse.prev();
{code}
 

and TreeMap:
{code:java}
final TreeMap map = new TreeMap<>();
final NavigableSet nav = map.navigableKeySet();
final NavigableSet rev = map.descendingKeySet();{code}
 

Haven't found any performance note on the APIs to worry about.

 

 

> Support reverse iterator on WindowStore
> ---
>
> Key: KAFKA-9929
> URL: https://issues.apache.org/jira/browse/KAFKA-9929
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Jorge Esteban Quilcate Otoya
>Priority: Major
>
> Currently, WindowStore fetch operations return an iterator sorted from 
> earliest to latest result:
> ```
> * For each key, the iterator guarantees ordering of windows, starting from 
> the oldest/earliest
> * available window to the newest/latest window.
> ```
>  
> We have a use-case where traces are stored in a WindowStore 
> and use Kafka Streams to create a materialized view of traces. A query 
> request comes with a time range (e.g. now-1h, now) and want to return the 
> most recent results, i.e. fetch from this period of time, iterate and pattern 
> match latest/most recent traces, and if enough results, then reply without 
> moving further on the iterator.
> Same store is used to search for previous traces. In this case, it search a 
> key for the last day, if found traces, we would also like to iterate from the 
> most recent.
> RocksDb seems to support iterating backward and forward: 
> [https://github.com/facebook/rocksdb/wiki/Iterator#iterating-upper-bound-and-lower-bound]
>  
> For reference: This in some way extracts some bits from this previous issue: 
> https://issues.apache.org/jira/browse/KAFKA-4212:
>  
> > The {{RocksDBWindowsStore}}, a {{WindowsStore}}, can expire items via 
> > segment dropping, but it stores multiple items per key, based on their 
> > timestamp. But this store can be repurposed as a cache by fetching the 
> > items in reverse chronological order and returning the first item found.
>  
> Would like to know if there is any impediment on RocksDb or  WindowStore to 
> support this.
> Adding an argument to reverse in current fetch methods would be great:
> ```
> WindowStore.fetch(from,to,Direction.BACKWARD|FORWARD)
> ```



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] bbejeck commented on pull request #8619: MINOR: Improve TopologyTestDriver JavaDocs

2020-05-05 Thread GitBox


bbejeck commented on pull request #8619:
URL: https://github.com/apache/kafka/pull/8619#issuecomment-624338625


   only javadoc is update, so merging this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-9959) leastLoadedNode() does not provide a node fairly

2020-05-05 Thread Cheng Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9959:
-
Issue Type: Improvement  (was: Bug)

> leastLoadedNode() does not provide a node fairly
> 
>
> Key: KAFKA-9959
> URL: https://issues.apache.org/jira/browse/KAFKA-9959
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>
> Currently, leastLoadedNode() provides a node with the following criteria:
>  # Provide the connected node with least number of inflight requests
>  # If no connected node exists, provide the connecting node with the largest 
> index in the cached list of nodes.
>  # If no connected or connecting node exists, provide the disconnected node 
> which respects the reconnect backoff with the largest index in the cached 
> list of nodes.
> However, criteria 2 and 3 may cause issues.
>  
> Criteria 2: Since the timeoutCallsToSend() does not change the connection 
> status, the node will remain a connecting status after the request time out. 
> If no connected node exists, leastLoadedNode() will provide this same node 
> until the socket timeout reached. It would be better to overlook the 
> connecting node if any request has timed out on it.
>  
> Criteria3: If the time interval between two invokes of leastLoadedNode() is 
> greater than the reconnect.backoff.ms, the same disconnected node may be 
> provided. We also want to pick a node with the least number of failed times.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9959) leastLoadedNode() does not provide a node fairly

2020-05-05 Thread Cheng Tan (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9959?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Cheng Tan updated KAFKA-9959:
-
Description: 
Currently, leastLoadedNode() provides a node with the following criteria:
 # Provide the connected node with least number of inflight requests
 # If no connected node exists, provide the connecting node with the largest 
index in the cached list of nodes.
 # If no connected or connecting node exists, provide the disconnected node 
which respects the reconnect backoff with the largest index in the cached list 
of nodes.

However, criteria 2 and 3 may cause issues.

 

Criteria 2: Since the timeoutCallsToSend() does not change the connection 
status, the node will remain a connecting status after the request time out. If 
no connected node exists, leastLoadedNode() will provide this same node until 
the socket timeout reached. It would be better to overlook the connecting node 
if any request has timed out on it.

 

Criteria3: If the time interval between two invokes of leastLoadedNode() is 
greater than the reconnect.backoff.ms, the same disconnected node may be 
provided. We also want to pick a node with the least number of failed times.

> leastLoadedNode() does not provide a node fairly
> 
>
> Key: KAFKA-9959
> URL: https://issues.apache.org/jira/browse/KAFKA-9959
> Project: Kafka
>  Issue Type: Bug
>Reporter: Cheng Tan
>Assignee: Cheng Tan
>Priority: Major
>
> Currently, leastLoadedNode() provides a node with the following criteria:
>  # Provide the connected node with least number of inflight requests
>  # If no connected node exists, provide the connecting node with the largest 
> index in the cached list of nodes.
>  # If no connected or connecting node exists, provide the disconnected node 
> which respects the reconnect backoff with the largest index in the cached 
> list of nodes.
> However, criteria 2 and 3 may cause issues.
>  
> Criteria 2: Since the timeoutCallsToSend() does not change the connection 
> status, the node will remain a connecting status after the request time out. 
> If no connected node exists, leastLoadedNode() will provide this same node 
> until the socket timeout reached. It would be better to overlook the 
> connecting node if any request has timed out on it.
>  
> Criteria3: If the time interval between two invokes of leastLoadedNode() is 
> greater than the reconnect.backoff.ms, the same disconnected node may be 
> provided. We also want to pick a node with the least number of failed times.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeffhuang26 opened a new pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

2020-05-05 Thread GitBox


jeffhuang26 opened a new pull request #8620:
URL: https://github.com/apache/kafka/pull/8620


   Added supporting customized  HTTP Response Headers for Kafka Connect REST 
server.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #8603: MINOR: Fix ProcessorContext JavaDocs

2020-05-05 Thread GitBox


mjsax commented on a change in pull request #8603:
URL: https://github.com/apache/kafka/pull/8603#discussion_r420463508



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/ProcessorContext.java
##
@@ -209,49 +211,52 @@ Cancellable schedule(final Duration interval,
  void forward(final K1 key, final V1 value, 
final String childName);
 
 /**
- * Requests a commit
+ * Requests a commit.
  */
 void commit();
 
 /**
  * Returns the topic name of the current input record; could be null if it 
is not
- * available (for example, if this method is invoked from the punctuate 
call)
+ * available (for example, if this method is invoked from the punctuate 
call).
  *
  * @return the topic name
  */
 String topic();
 
 /**
  * Returns the partition id of the current input record; could be -1 if it 
is not
- * available (for example, if this method is invoked from the punctuate 
call)
+ * available (for example, if this method is invoked from the punctuate 
call).
  *
  * @return the partition id
  */
 int partition();
 
 /**
  * Returns the offset of the current input record; could be -1 if it is not
- * available (for example, if this method is invoked from the punctuate 
call)
+ * available (for example, if this method is invoked from the punctuate 
call).
  *
  * @return the offset
  */
 long offset();
 
 /**
- * Returns the headers of the current input record; could be null if it is 
not available
+ * Returns the headers of the current input record; could be null if it is 
not
+ * available (for example, if this method is invoked from the punctuate 
call).
+ *
  * @return the headers
  */
 Headers headers();
 
 /**
  * Returns the current timestamp.
  *
- * If it is triggered while processing a record streamed from the source 
processor, timestamp is defined as the timestamp of the current input record; 
the timestamp is extracted from
+ *  If it is triggered while processing a record streamed from the 
source processor,
+ * timestamp is defined as the timestamp of the current input record; the 
timestamp is extracted from
  * {@link org.apache.kafka.clients.consumer.ConsumerRecord ConsumerRecord} 
by {@link TimestampExtractor}.
  *
- * If it is triggered while processing a record generated not from the 
source processor (for example,
+ *  If it is triggered while processing a record generated not from the 
source processor (for example,
  * if this method is invoked from the punctuate call), timestamp is 
defined as the current
- * task's stream time, which is defined as the smallest among all its 
input stream partition timestamps.
+ * task's stream time, which is defined as the largest among all its input 
stream partition timestamps.

Review comment:
   Sound like a bug :)
   
   But it seems to be a one line fix that I can piggy-back on this PR. We 
advance the "partition time" too early. If we advance it when the return the 
record for processing, all should be fixed?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rhauch commented on a change in pull request #8620: KAFKA-9944: Added supporting customized HTTP response headers for Kafka Connect.

2020-05-05 Thread GitBox


rhauch commented on a change in pull request #8620:
URL: https://github.com/apache/kafka/pull/8620#discussion_r420464820



##
File path: checkstyle/checkstyle.xml
##
@@ -132,7 +132,7 @@
 
 
   
-  
+  

Review comment:
   Why change the setting instead of modifying `suppressions.xml` to 
exclude certain classes from this rule?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##
@@ -244,6 +244,14 @@
 + "user requests to reset the set of active topics per connector.";
 protected static final boolean TOPIC_TRACKING_ALLOW_RESET_DEFAULT = true;
 
+/**
+ * @link 
"https://www.eclipse.org/jetty/documentation/current/header-filter.html";
+ * @link 
"https://www.eclipse.org/jetty/javadoc/9.4.28.v20200408/org/eclipse/jetty/servlets/HeaderFilter.html";
+ **/
+public static final String RESPONSE_HTTP_HEADERS_CONFIG = 
"response.http.headers.config";
+public static final String RESPONSE_HTTP_HEADERS_DOC = "Set values for 
Jetty HTTP response headers";

Review comment:
   I don't think we should expose `Jetty` here. Yes, we're following the 
Jetty grammar and format for these, but let's not unnecessarily expose the 
internals.
   ```suggestion
   public static final String RESPONSE_HTTP_HEADERS_DOC = "Rules for REST 
API HTTP response headers";
   ```

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/rest/RestServer.java
##
@@ -461,4 +469,18 @@ public static String urlJoin(String base, String path) {
 return base + path;
 }
 
+/**
+ * Register header filter to ServletContextHandler.
+ * @param context The serverlet context handler
+ */
+protected void configureHttpResponsHeaderFilter(ServletContextHandler 
context) {
+String headerConfig = 
config.getString(WorkerConfig.RESPONSE_HTTP_HEADERS_CONFIG);
+log.debug("headerConfig : " + headerConfig);

Review comment:
   Is this line really necessary? Isn't the `response.http.headers.config` 
property already logged at INFO level when the worker starts up, via the 
WorkerConfig (or rather DistributedConfig or StandaloneConfig) constructor?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerConfig.java
##
@@ -400,6 +410,52 @@ public WorkerConfig(ConfigDef definition, Map props) {
 logInternalConverterDeprecationWarnings(props);
 }
 
+public static void validateHttpResponseHeaderConfig(String config) {

Review comment:
   Why not implement these as a `ConfigDef.Validator` implementation, 
similar to the existing `AdminListenersValidator` below?

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##
@@ -392,6 +393,106 @@ public void testDisableAdminEndpoint() throws IOException 
{
 Assert.assertEquals(404, response.getStatusLine().getStatusCode());
 }
 
+@Test
+public void TestValidCustomizedHttpResponseHeaders() throws IOException  {
+String headerConfig =
+"add X-XSS-Protection: 1; mode=block, \"add Cache-Control: 
no-cache, no-store, must-revalidate\"";
+Map expectedHeaders = new HashMap<>();
+expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+expectedHeaders.put("Cache-Control", "no-cache, no-store, 
must-revalidate");
+checkCustomizedHttpResponseHeaders(headerConfig, expectedHeaders);
+}
+
+@Test
+public void TestDefaultCustomizedHttpResponseHeaders() throws IOException  
{

Review comment:
   Nit:
   ```suggestion
   public void testDefaultCustomizedHttpResponseHeaders() throws 
IOException  {
   ```

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##
@@ -392,6 +393,106 @@ public void testDisableAdminEndpoint() throws IOException 
{
 Assert.assertEquals(404, response.getStatusLine().getStatusCode());
 }
 
+@Test
+public void TestValidCustomizedHttpResponseHeaders() throws IOException  {

Review comment:
   Nit:
   ```suggestion
   public void testValidCustomizedHttpResponseHeaders() throws IOException  
{
   ```

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/rest/RestServerTest.java
##
@@ -392,6 +393,106 @@ public void testDisableAdminEndpoint() throws IOException 
{
 Assert.assertEquals(404, response.getStatusLine().getStatusCode());
 }
 
+@Test
+public void TestValidCustomizedHttpResponseHeaders() throws IOException  {
+String headerConfig =
+"add X-XSS-Protection: 1; mode=block, \"add Cache-Control: 
no-cache, no-store, must-revalidate\"";
+Map expectedHeaders = new HashMap<>();
+expectedHeaders.put("X-XSS-Protection", "1; mode=block");
+expectedHeaders.put("Cache-Contro

  1   2   >