[jira] [Commented] (KAFKA-12308) ConfigDef.parseType deadlock

2021-02-24 Thread Konstantine Karantasis (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12308?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290756#comment-17290756
 ] 

Konstantine Karantasis commented on KAFKA-12308:


[~tombentley] I actually think that the initial suggesting in 
https://issues.apache.org/jira/browse/KAFKA-7421 regarding the removal of the 
method lock is correct. 

The `DelegatingClassLoader` doesn't seem to need to be parallel because it 
delegates loading to either `PluginClassLoader` instances that are parallel 
capable or the parent which normally is the system classloader and should also 
be parallel. 

Note, that the loading sequence that you mention above, is inverted on purpose 
to actually implement classloading isolation. First we attempt loading the 
class from the "child" `PluginClassLoader` of the designated plugin and if not 
found then the parent classloader of the `DelegatingClassLoader` is consulted. 

I have updated the PR that had added a test for this type of deadlock 
originally submitted by [~gharris1727] in: 
[https://github.com/apache/kafka/pull/8259]

cc [~rhauch]

> ConfigDef.parseType deadlock
> 
>
> Key: KAFKA-12308
> URL: https://issues.apache.org/jira/browse/KAFKA-12308
> Project: Kafka
>  Issue Type: Bug
>  Components: config
>Affects Versions: 2.5.0
> Environment: kafka 2.5.0
> centos7
> java version "1.8.0_231"
>Reporter: cosmozhu
>Priority: Major
> Attachments: deadlock.log
>
>
> hi,
>  the problem was found, when I restarted *ConnectDistributed*
> I restart ConnectDistributed in the single node for the test, with not delete 
> connectors.
>  sometimes the process stopped when creating connectors.
> I add some logger and found it had a deadlock in `ConfigDef.parseType`.My 
> connectors always have the same transforms. I guess when connector startup 
> (in startAndStopExecutor which default 8 threads) and load the same class 
> file it has something wrong.
> I attached the jstack log file.
> thanks for any help.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kkonstantine commented on a change in pull request #8259: KAFKA-7421: Ensure Connect's PluginClassLoader is truly parallel capable and resolve deadlock occurrences

2021-02-24 Thread GitBox


kkonstantine commented on a change in pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#discussion_r582614443



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/runtime/isolation/SynchronizationTest.java
##
@@ -0,0 +1,468 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.connect.runtime.isolation;
+
+import static org.junit.Assert.fail;
+
+import java.lang.management.LockInfo;
+import java.lang.management.ManagementFactory;
+import java.lang.management.MonitorInfo;
+import java.lang.management.ThreadInfo;
+import java.net.URL;
+import java.security.AccessController;
+import java.security.PrivilegedAction;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.BrokenBarrierException;
+import java.util.concurrent.CyclicBarrier;
+import java.util.concurrent.LinkedBlockingDeque;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.Predicate;
+import java.util.stream.Collectors;
+import org.apache.kafka.common.config.AbstractConfig;
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigDef.Importance;
+import org.apache.kafka.common.config.ConfigDef.Type;
+import org.apache.kafka.connect.runtime.WorkerConfig;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TestName;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SynchronizationTest {
+
+public static final Logger log = 
LoggerFactory.getLogger(SynchronizationTest.class);
+
+@Rule
+public final TestName testName = new TestName();
+
+private String threadPrefix;
+private Plugins plugins;
+private ThreadPoolExecutor exec;
+private Breakpoint dclBreakpoint;
+private Breakpoint pclBreakpoint;
+
+@Before
+public void setup() {
+TestPlugins.assertAvailable();
+Map pluginProps = Collections.singletonMap(
+WorkerConfig.PLUGIN_PATH_CONFIG,
+String.join(",", TestPlugins.pluginPath())
+);
+threadPrefix = SynchronizationTest.class.getSimpleName()
++ "." + testName.getMethodName() + "-";
+dclBreakpoint = new Breakpoint<>();
+pclBreakpoint = new Breakpoint<>();
+plugins = new Plugins(pluginProps) {
+@Override
+protected DelegatingClassLoader 
newDelegatingClassLoader(List paths) {
+return AccessController.doPrivileged(
+(PrivilegedAction) () ->
+new SynchronizedDelegatingClassLoader(paths)
+);
+}
+};
+exec = new ThreadPoolExecutor(
+2,
+2,
+1000L,
+TimeUnit.MILLISECONDS,
+new LinkedBlockingDeque<>(),
+threadFactoryWithNamedThreads(threadPrefix)
+);
+
+}
+
+@After
+public void tearDown() throws InterruptedException {
+dclBreakpoint.clear();
+pclBreakpoint.clear();
+exec.shutdown();
+exec.awaitTermination(1L, TimeUnit.SECONDS);
+}
+
+private static class Breakpoint {
+
+private Predicate predicate;
+private CyclicBarrier barrier;
+
+public synchronized void clear() {
+if (barrier != null) {
+barrier.reset();
+}
+predicate = null;
+barrier = null;
+}
+
+public synchronized void set(Predicate predicate) {
+clear();
+this.predicate = predicate;
+// As soon as the barrier is tripped, the barrier will be reset 
for the next round.
+barrier = new CyclicBarrier(2);
+}
+
+/**
+ * From a thread under test, await for the test orchestrator to 
continue execution
+ * @param obj Object to test with the breakpoint's current predicate
+ */

[GitHub] [kafka] kkonstantine commented on pull request #8259: KAFKA-7421: Reproduce Plugin/Delegating ClassLoader deadlock

2021-02-24 Thread GitBox


kkonstantine commented on pull request #8259:
URL: https://github.com/apache/kafka/pull/8259#issuecomment-785695890


   I posted a change that hopefully fixes the issue and is also mentioned in 
https://issues.apache.org/jira/browse/KAFKA-7421
   The method lock seems indeed to be there as an oversight. 
   
   A change in test was also required to make the test classloader capable of 
parallel loading. Without this registration the test fails. It also fails if 
the method lock is retained. 
   
   @gharris1727 please take another look. Also, cc @rhauch 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dpoldrugo commented on pull request #10059: KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer

2021-02-24 Thread GitBox


dpoldrugo commented on pull request #10059:
URL: https://github.com/apache/kafka/pull/10059#issuecomment-785690250


   Thanks @hachikuji and @omkreddy for you comments. 
   Changed the tests as you proposed:
   
https://github.com/apache/kafka/pull/10059/commits/c605ea41fb0b342e9bb17595941c0e658bb63360



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9203) kafka-client 2.3.1 fails to consume lz4 compressed topic

2021-02-24 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-9203?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290739#comment-17290739
 ] 

Xavier Léauté commented on KAFKA-9203:
--

I submitted a PR that will help detect if we have an older buggy lz4 version on 
the classpath. This should help prevent these situations and provide a more 
helpful error message. https://github.com/apache/kafka/pull/10196

> kafka-client 2.3.1 fails to consume lz4 compressed topic
> 
>
> Key: KAFKA-9203
> URL: https://issues.apache.org/jira/browse/KAFKA-9203
> Project: Kafka
>  Issue Type: Bug
>  Components: compression, consumer
>Affects Versions: 2.3.0, 2.3.1
>Reporter: David Watzke
>Assignee: Ismael Juma
>Priority: Blocker
> Fix For: 2.4.0, 2.3.2
>
> Attachments: kafka-clients-2.3.2-SNAPSHOT.jar
>
>
> I run kafka cluster 2.1.1
> when I upgraded the consumer app to use kafka-client 2.3.0 (or 2.3.1) instead 
> of 2.2.0, I immediately started getting the following exceptions in a loop 
> when consuming a topic with LZ4-compressed messages:
> {noformat}
> 2019-11-18 11:59:16.888 ERROR [afka-consumer-thread] 
> com.avast.utils2.kafka.consumer.ConsumerThread: Unexpected exception occurred 
> while polling and processing messages: org.apache.kafka.common.KafkaExce
> ption: Received exception when fetching the next record from 
> FILEREP_LONER_REQUEST-4. If needed, please seek past the record to continue 
> consumption. 
> org.apache.kafka.common.KafkaException: Received exception when fetching the 
> next record from FILEREP_LONER_REQUEST-4. If needed, please seek past the 
> record to continue consumption. 
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.fetchRecords(Fetcher.java:1473)
>  
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher$PartitionRecords.access$1600(Fetcher.java:1332)
>  
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchRecords(Fetcher.java:645)
>  
>     at 
> org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:606)
>  
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1263)
>  
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1225) 
>     at 
> org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1201) 
>     at 
> com.avast.utils2.kafka.consumer.ConsumerThread.pollAndProcessMessagesFromKafka(ConsumerThread.java:180)
>  
>     at 
> com.avast.utils2.kafka.consumer.ConsumerThread.run(ConsumerThread.java:149) 
>     at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$8(RequestSaver.scala:28) 
>     at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$8$adapted(RequestSaver.scala:19)
>  
>     at 
> resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
>  
>     at 
> scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
>     at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
>     at scala.util.control.Exception$Catch.either(Exception.scala:252) 
>     at 
> resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala:88) 
>     at 
> resource.ManagedResourceOperations.apply(ManagedResourceOperations.scala:26) 
>     at 
> resource.ManagedResourceOperations.apply$(ManagedResourceOperations.scala:26) 
>     at 
> resource.AbstractManagedResource.apply(AbstractManagedResource.scala:50) 
>     at 
> resource.ManagedResourceOperations.acquireAndGet(ManagedResourceOperations.scala:25)
>  
>     at 
> resource.ManagedResourceOperations.acquireAndGet$(ManagedResourceOperations.scala:25)
>  
>     at 
> resource.AbstractManagedResource.acquireAndGet(AbstractManagedResource.scala:50)
>  
>     at 
> resource.ManagedResourceOperations.foreach(ManagedResourceOperations.scala:53)
>  
>     at 
> resource.ManagedResourceOperations.foreach$(ManagedResourceOperations.scala:53)
>  
>     at 
> resource.AbstractManagedResource.foreach(AbstractManagedResource.scala:50) 
>     at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$6(RequestSaver.scala:19) 
>     at 
> com.avast.filerep.saver.RequestSaver$.$anonfun$main$6$adapted(RequestSaver.scala:18)
>  
>     at 
> resource.AbstractManagedResource.$anonfun$acquireFor$1(AbstractManagedResource.scala:88)
>  
>     at 
> scala.util.control.Exception$Catch.$anonfun$either$1(Exception.scala:252) 
>     at scala.util.control.Exception$Catch.apply(Exception.scala:228) 
>     at scala.util.control.Exception$Catch.either(Exception.scala:252) 
>     at 
> resource.AbstractManagedResource.acquireFor(AbstractManagedResource.scala

[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


chia7712 commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582592473



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -3303,6 +3304,28 @@ class KafkaApis(val requestChannel: RequestChannel,
   new 
DescribeTransactionsResponse(response.setThrottleTimeMs(requestThrottleMs)))
   }
 
+  def handleListTransactionsRequest(request: RequestChannel.Request): Unit = {
+val listTransactionsRequest = request.body[ListTransactionsRequest]
+val response = new ListTransactionsResponseData()
+
+val filteredProducerIds = 
listTransactionsRequest.data.producerIdFilter.asScala.map(Long.unbox).toSet
+val filteredStates = 
listTransactionsRequest.data.statesFilter.asScala.toSet
+
+txnCoordinator.handleListTransactions(filteredProducerIds, filteredStates) 
match {
+  case Left(error) =>
+response.setErrorCode(error.code)
+  case Right(transactions) =>
+val authorizedTransactions = transactions.filter { state =>
+  authHelper.authorize(request.context, DESCRIBE, TRANSACTIONAL_ID, 
state.transactionalId)

Review comment:
   Why `TransactionState` does not include error field (similar to 
`DescribeTransactionsResponseData.TransactionState`)?

##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -223,6 +224,49 @@ class TransactionStateManager(brokerId: Int,
   throw new IllegalStateException(s"Unexpected empty transaction metadata 
returned while putting $txnMetadata")))
   }
 
+  def listTransactionStates(
+filterProducerIds: Set[Long],
+filterStateNames: Set[String]
+  ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = {
+inReadLock(stateLock) {
+  if (loadingPartitions.nonEmpty) {
+Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)
+  } else {
+val filterStates = filterStateNames.flatMap(TransactionState.fromName)
+val states = 
mutable.ListBuffer.empty[ListTransactionsResponseData.TransactionState]
+
+def shouldInclude(txnMetadata: TransactionMetadata): Boolean = {
+  if (txnMetadata.state == Dead) {
+// We filter the `Dead` state since it is a transient state which
+// indicates that the transactionalId and its metadata are  in the

Review comment:
   redundant "space"





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10162: DOCS: Update protocol doc for missing data type

2021-02-24 Thread GitBox


chia7712 commented on a change in pull request #10162:
URL: https://github.com/apache/kafka/pull/10162#discussion_r582587709



##
File path: clients/src/main/resources/common/message/README.md
##
@@ -75,16 +75,24 @@ There are several primitive field types available.
 
 * "int16": a 16-bit integer.
 
+* "uint16": a 16-bit unsigned integer.
+
 * "int32": a 32-bit integer.
 
+* "uint32": a 32-bit unsigned integer.
+
 * "int64": a 64-bit integer.
 
 * "float64": is a double-precision floating point number (IEEE 754).
 
 * "string": a UTF-8 string.
 
+* "uuid": a type 4 immutable universally unique identifier.
+
 * "bytes": binary data.
 
+* "records": memory record set or file record set.

Review comment:
   That LGTM :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

2021-02-24 Thread GitBox


dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r582587511



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -299,7 +290,16 @@ final class KafkaMetadataLog private (
   // If snapshotIds contains a snapshot id, the KafkaRaftClient and 
Listener can expect that the snapshot exists
   // on the file system, so we should first remove snapshotId and then 
delete snapshot file.
   expiredSnapshotIdsIter.remove()
-  Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId)
+
+  val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+  val destination = Snapshots.deleteRename(path, snapshotId)
+  try {
+Utils.atomicMoveWithFallback(path, destination)
+  } catch {
+case e: IOException =>
+  warn("Error renaming snapshot file: " + path + " to :" + destination 
+ ", " + e.getMessage)
+  }
+  scheduler.schedule("delete-snapshot-file", () => 
Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId))

Review comment:
   Done!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

2021-02-24 Thread GitBox


dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r582587332



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -341,7 +342,7 @@ object KafkaMetadataLog {
 }
   }
 
-val replicatedLog = new KafkaMetadataLog(log, snapshotIds, topicPartition, 
maxFetchSizeInBytes)
+val replicatedLog = new KafkaMetadataLog(log, scheduler, snapshotIds, 
topicPartition, maxFetchSizeInBytes)

Review comment:
   I also find it's awkward to represent snapshot files using 
`OffsetAndEpoch` but we need to rely on `Snapshots` to remove and delete 
snapshots, will file a ticket to discuss consolidating file management, the 
first thought is to add  `MetadataLogSnapshot` class to represent a snapshot 
file.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


chia7712 commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582586860



##
File path: 
clients/src/main/resources/common/message/ListTransactionsRequest.json
##
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 66,
+  "type": "request",
+  "listeners": ["zkBroker", "broker"],
+  "name": "ListTransactionsRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "StatesFilter", "type": "[]string", "versions": "0+",
+  "about": "The transaction states to filter by: if empty, all 
transactions are returned; if non-empty, then only transactions matching one of 
the filtered states will be returned"
+},
+{ "name": "ProducerIdFilter", "type": "[]int64", "versions": "0+",
+  "about": "The producerIds to filter by: if empty, no transactions will 
be returned; if non-empty, only transactions which match one of the filtered 
producerIds will be returned"

Review comment:
   > On a side note, do you think it would be clearer if we used null to 
indicate the absence of the filter?
   
   it should be fine in this case. However, the `empty` collection gets weird 
as it absolutely gets nothing :(





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on pull request #10052: KAFKA-12289: Adding test cases for prefix scan in InMemoryKeyValueStore

2021-02-24 Thread GitBox


vamossagar12 commented on pull request #10052:
URL: https://github.com/apache/kafka/pull/10052#issuecomment-785663227


   Thanks @cadonna , i have committed your suggestions.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


chia7712 commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582583372



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int,
   throw new IllegalStateException(s"Unexpected empty transaction metadata 
returned while putting $txnMetadata")))
   }
 
+  def listTransactionStates(
+filterProducerIds: Set[Long],
+filterStateNames: Set[String]
+  ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = {
+inReadLock(stateLock) {
+  if (loadingPartitions.nonEmpty) {
+Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)
+  } else {
+val filterStates = filterStateNames.flatMap(TransactionState.fromName)

Review comment:
   Probably the response should have a field showing the invalid filters. 
The callers would know which filters are NOT working.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


hachikuji commented on pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#issuecomment-785640598


   @chia7712 Thanks, I really appreciate your help reviewing these patches. I 
pushed a couple comments and left a few responses to your questions. 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582568344



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
##
@@ -25,8 +25,50 @@ import org.apache.kafka.common.record.RecordBatch
 
 import scala.collection.{immutable, mutable}
 
+
+object TransactionState {
+  val AllStates = Set(
+Empty,
+Ongoing,
+PrepareCommit,
+PrepareAbort,
+CompleteCommit,
+CompleteAbort,
+Dead,
+PrepareEpochFence
+  )
+
+  def fromName(name: String): Option[TransactionState] = {
+name match {
+  case "Empty" => Some(Empty)
+  case "Ongoing" => Some(Ongoing)
+  case "PrepareCommit" => Some(PrepareCommit)
+  case "PrepareAbort" => Some(PrepareAbort)
+  case "CompleteCommit" => Some(CompleteCommit)
+  case "CompleteAbort" => Some(CompleteAbort)
+  case "PrepareEpochFence" => Some(PrepareEpochFence)
+  case "Dead" => Some(Dead)
+  case _ => None
+}
+  }
+
+  def fromId(id: Byte): TransactionState = {

Review comment:
   I left this one as it was since I am not 100% sure whether it has any 
impact on the performance of transaction loading.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582567640



##
File path: 
clients/src/main/resources/common/message/ListTransactionsRequest.json
##
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 66,
+  "type": "request",
+  "listeners": ["zkBroker", "broker"],
+  "name": "ListTransactionsRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "StatesFilter", "type": "[]string", "versions": "0+",
+  "about": "The transaction states to filter by: if empty, all 
transactions are returned; if non-empty, then only transactions matching one of 
the filtered states will be returned"
+},
+{ "name": "ProducerIdFilter", "type": "[]int64", "versions": "0+",
+  "about": "The producerIds to filter by: if empty, no transactions will 
be returned; if non-empty, only transactions which match one of the filtered 
producerIds will be returned"

Review comment:
   Yeah, my bad. On a side note, do you think it would be clearer if we 
used `null` to indicate the absence of the filter?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582566067



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int,
   throw new IllegalStateException(s"Unexpected empty transaction metadata 
returned while putting $txnMetadata")))
   }
 
+  def listTransactionStates(
+filterProducerIds: Set[Long],
+filterStateNames: Set[String]
+  ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = {
+inReadLock(stateLock) {
+  if (loadingPartitions.nonEmpty) {

Review comment:
   The problem is that we don't have a way to map from the producerId to 
the partition. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582565318



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int,
   throw new IllegalStateException(s"Unexpected empty transaction metadata 
returned while putting $txnMetadata")))
   }
 
+  def listTransactionStates(
+filterProducerIds: Set[Long],
+filterStateNames: Set[String]
+  ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = {
+inReadLock(stateLock) {
+  if (loadingPartitions.nonEmpty) {
+Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)
+  } else {
+val filterStates = filterStateNames.flatMap(TransactionState.fromName)

Review comment:
   I could be persuaded probably. Suppose we add a new state in the future. 
If a user tried to query that state on an old broker, definitely no 
transactions would exist in that state. From that perspective, the 
implementation would return an accurate result. On the other hand, I can see 
how that might be misleading. If we decide to return an error code, then I 
think we'll have to treat the state filter a little more carefully from a 
versioning perspective. A new state would probably demand a new version. I 
think we tried to view the states a little bit more loosely in the `ListGroups` 
API (which this is modeled after) because we considered the state machine more 
of an internal implementation detail. This, by the way, is why we went with the 
string representation rather than numeric ids. This became more of a grey area 
though when the state filter was added to the request...
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10162: DOCS: Update protocol doc for missing data type

2021-02-24 Thread GitBox


dengziming commented on a change in pull request #10162:
URL: https://github.com/apache/kafka/pull/10162#discussion_r582564994



##
File path: clients/src/main/resources/common/message/README.md
##
@@ -75,16 +75,24 @@ There are several primitive field types available.
 
 * "int16": a 16-bit integer.
 
+* "uint16": a 16-bit unsigned integer.
+
 * "int32": a 32-bit integer.
 
+* "uint32": a 32-bit unsigned integer.
+
 * "int64": a 64-bit integer.
 
 * "float64": is a double-precision floating point number (IEEE 754).
 
 * "string": a UTF-8 string.
 
+* "uuid": a type 4 immutable universally unique identifier.
+
 * "bytes": binary data.
 
+* "records": memory record set or file record set.

Review comment:
   I think we should avoid listing subclasses since we would have to update 
docs every time we add a new subclass, how about rewording this to "recordset 
such as memory recordset"?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10847) Avoid spurious left/outer join results in stream-stream join

2021-02-24 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290685#comment-17290685
 ] 

Guozhang Wang commented on KAFKA-10847:
---

I agree that we should delete upon emitting expired records' joined results. 
Currently since we do range-query + deletion per input record, I guess in 
practice each time we would only expire very few records.

If range query + deletion turns out to be an overhead in practice, we can 
consider 1) do range-query + deletion less frequently so that each time we 
would get a reasonable number of records to expire, and 2) use range deletion 
(https://rocksdb.org/blog/2018/11/21/delete-range.html), which would be 
efficient especially if we have more records to expire in one call.

bq. I do a single-lookup in the store to check if the key is there, if not, 
then it continues; otherwise it calls the put(key, null) to delete it.

Just a syntax sugar, you can just call `putIfAbsent(key, null)` instead.

> Avoid spurious left/outer join results in stream-stream join 
> -
>
> Key: KAFKA-10847
> URL: https://issues.apache.org/jira/browse/KAFKA-10847
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sergio Peña
>Priority: Major
>
> KafkaStreams follows an eager execution model, ie, it never buffers input 
> records but processes them right away. For left/outer stream-stream join, 
> this implies that left/outer join result might be emitted before the window 
> end (or window close) time is reached. Thus, a record what will be an 
> inner-join result, might produce a eager (and spurious) left/outer join 
> result.
> We should change the implementation of the join, to not emit eager left/outer 
> join result, but instead delay the emission of such result after the window 
> grace period passed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ivanyu commented on a change in pull request #9990: KAFKA-12235: Fix ZkAdminManager.describeConfigs on 2+ config keys

2021-02-24 Thread GitBox


ivanyu commented on a change in pull request #9990:
URL: https://github.com/apache/kafka/pull/9990#discussion_r582559897



##
File path: core/src/main/scala/kafka/server/ConfigHelper.scala
##
@@ -47,11 +47,11 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
 
   def createResponseConfig(configs: Map[String, Any],
createConfigEntry: (String, Any) => 
DescribeConfigsResponseData.DescribeConfigsResourceResult): 
DescribeConfigsResponseData.DescribeConfigsResult = {
-val filteredConfigPairs = if (resource.configurationKeys == null)
+val filteredConfigPairs = if (resource.configurationKeys == null || 
resource.configurationKeys.isEmpty)

Review comment:
   I agree with this. However it seems treating empty as null is de facto 
behavior at the moment (before the fix). To be more precise, here
   
https://github.com/apache/kafka/blob/e2a0d0c90e1916d77223a420e3595e8aba643001/core/src/main/scala/kafka/server/ConfigHelper.scala#L53-L55
   `resource.configurationKeys` being empty means `true` for each element of 
`configs`, so effectively no filtering. Which, in turn, is equivalent to 
`resource.configurationKeys == null`.
   
   For example, `testDescribeConfigsWithDocumentation` and 
`testDescribeConfigsWithEmptyConfigurationKeys` in `ZkAdminManagerTest` break 
after the fix if treating empty as null is not kept.
   
   I guess it originates from that the default constructor of 
`DescribeConfigsResource` makes `configurationKeys` an empty array.
   
   We can either 1) keep the current behavior (the current approach); or 2) 
stick to the spec (also potentially adding `"default": "null"` to the field 
description). I'd prefer 2. What do you think, @cmccabe?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ivanyu commented on a change in pull request #9990: KAFKA-12235: Fix ZkAdminManager.describeConfigs on 2+ config keys

2021-02-24 Thread GitBox


ivanyu commented on a change in pull request #9990:
URL: https://github.com/apache/kafka/pull/9990#discussion_r582559897



##
File path: core/src/main/scala/kafka/server/ConfigHelper.scala
##
@@ -47,11 +47,11 @@ class ConfigHelper(metadataCache: MetadataCache, config: 
KafkaConfig, configRepo
 
   def createResponseConfig(configs: Map[String, Any],
createConfigEntry: (String, Any) => 
DescribeConfigsResponseData.DescribeConfigsResourceResult): 
DescribeConfigsResponseData.DescribeConfigsResult = {
-val filteredConfigPairs = if (resource.configurationKeys == null)
+val filteredConfigPairs = if (resource.configurationKeys == null || 
resource.configurationKeys.isEmpty)

Review comment:
   I agree with this. However it seems treating empty as null is de facto 
behavior at the moment (before the fix). To be more precise, here
   
https://github.com/apache/kafka/blob/e2a0d0c90e1916d77223a420e3595e8aba643001/core/src/main/scala/kafka/server/ConfigHelper.scala#L53-L55
   `resource.configurationKeys` being empty means `true` for each element of 
`configs`, so effectively no filtering. Which, in turn, is equivalent to 
`resource.configurationKeys == null`.
   
   For example, `testDescribeConfigsWithDocumentation` and 
`testDescribeConfigsWithEmptyConfigurationKeys` in `ZkAdminManagerTest` break 
after the fix if treating empty as null is not kept.
   
   I guess it originates from that the default constructor of 
`DescribeConfigsResource` makes `configurationKeys` an empty array.
   
   We can either 1) keep the current behavior (the current approach); or 2) 
stick to the spec (also potentially adding `"default": "null"` to the field 
description). I'd prefer 2. What do you think?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582558759



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.ListTransactionsRequestData;
+import org.apache.kafka.common.message.ListTransactionsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+public class ListTransactionsRequest extends AbstractRequest {
+public static class Builder extends 
AbstractRequest.Builder {
+public final ListTransactionsRequestData data;
+
+public Builder(ListTransactionsRequestData data) {
+super(ApiKeys.LIST_TRANSACTIONS);
+this.data = data;
+}
+
+@Override
+public ListTransactionsRequest build(short version) {
+return new ListTransactionsRequest(data, version);
+}
+
+@Override
+public String toString() {
+return data.toString();
+}
+}
+
+private final ListTransactionsRequestData data;
+
+private ListTransactionsRequest(ListTransactionsRequestData data, short 
version) {
+super(ApiKeys.LIST_TRANSACTIONS, version);
+this.data = data;
+}
+
+public ListTransactionsRequestData data() {
+return data;
+}
+
+@Override
+public ListTransactionsResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
+Errors error = Errors.forException(e);
+ListTransactionsResponseData response = new 
ListTransactionsResponseData()

Review comment:
   Haha, I should really have checked for that after the last PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


chia7712 commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582558391



##
File path: 
clients/src/main/resources/common/message/ListTransactionsRequest.json
##
@@ -0,0 +1,31 @@
+// Licensed to the Apache Software Foundation (ASF) under one or more
+// contributor license agreements.  See the NOTICE file distributed with
+// this work for additional information regarding copyright ownership.
+// The ASF licenses this file to You under the Apache License, Version 2.0
+// (the "License"); you may not use this file except in compliance with
+// the License.  You may obtain a copy of the License at
+//
+//http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing, software
+// distributed under the License is distributed on an "AS IS" BASIS,
+// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+// See the License for the specific language governing permissions and
+// limitations under the License.
+
+{
+  "apiKey": 66,
+  "type": "request",
+  "listeners": ["zkBroker", "broker"],
+  "name": "ListTransactionsRequest",
+  "validVersions": "0",
+  "flexibleVersions": "0+",
+  "fields": [
+{ "name": "StatesFilter", "type": "[]string", "versions": "0+",
+  "about": "The transaction states to filter by: if empty, all 
transactions are returned; if non-empty, then only transactions matching one of 
the filtered states will be returned"
+},
+{ "name": "ProducerIdFilter", "type": "[]int64", "versions": "0+",
+  "about": "The producerIds to filter by: if empty, no transactions will 
be returned; if non-empty, only transactions which match one of the filtered 
producerIds will be returned"

Review comment:
   `no transactions will be returned` 
   ^^ this document is a bit weird since the code shows 
that all transactions are returned if this filter is empty.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


chia7712 commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582556446



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int,
   throw new IllegalStateException(s"Unexpected empty transaction metadata 
returned while putting $txnMetadata")))
   }
 
+  def listTransactionStates(
+filterProducerIds: Set[Long],
+filterStateNames: Set[String]
+  ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = {
+inReadLock(stateLock) {
+  if (loadingPartitions.nonEmpty) {
+Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)
+  } else {
+val filterStates = filterStateNames.flatMap(TransactionState.fromName)
+val states = 
mutable.ListBuffer.empty[ListTransactionsResponseData.TransactionState]
+
+def shouldInclude(txnMetadata: TransactionMetadata): Boolean = {
+  if (txnMetadata.state == Dead) {

Review comment:
   Could you add comment for this case? why it is excluded?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


chia7712 commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582549509



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int,
   throw new IllegalStateException(s"Unexpected empty transaction metadata 
returned while putting $txnMetadata")))
   }
 
+  def listTransactionStates(
+filterProducerIds: Set[Long],
+filterStateNames: Set[String]
+  ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = {
+inReadLock(stateLock) {
+  if (loadingPartitions.nonEmpty) {
+Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)
+  } else {
+val filterStates = filterStateNames.flatMap(TransactionState.fromName)
+val states = 
mutable.ListBuffer.empty[ListTransactionsResponseData.TransactionState]
+
+def shouldInclude(txnMetadata: TransactionMetadata): Boolean = {
+  if (txnMetadata.state == Dead) {
+false
+  } else if (filterProducerIds.nonEmpty && 
!filterProducerIds.contains(txnMetadata.producerId)) {
+false
+  } else if (filterStateNames.nonEmpty && 
!filterStates.contains(txnMetadata.state)) {
+false
+  } else {
+true
+  }
+}
+
+transactionMetadataCache.foreach { case (_, cache) =>

Review comment:
   Could you use `forKeyValue` instead of `foreach`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


chia7712 commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582547244



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
##
@@ -25,8 +25,50 @@ import org.apache.kafka.common.record.RecordBatch
 
 import scala.collection.{immutable, mutable}
 
+
+object TransactionState {
+  val AllStates = Set(
+Empty,
+Ongoing,
+PrepareCommit,
+PrepareAbort,
+CompleteCommit,
+CompleteAbort,
+Dead,
+PrepareEpochFence
+  )
+
+  def fromName(name: String): Option[TransactionState] = {
+name match {
+  case "Empty" => Some(Empty)
+  case "Ongoing" => Some(Ongoing)
+  case "PrepareCommit" => Some(PrepareCommit)
+  case "PrepareAbort" => Some(PrepareAbort)
+  case "CompleteCommit" => Some(CompleteCommit)
+  case "CompleteAbort" => Some(CompleteAbort)
+  case "PrepareEpochFence" => Some(PrepareEpochFence)
+  case "Dead" => Some(Dead)
+  case _ => None
+}
+  }
+
+  def fromId(id: Byte): TransactionState = {

Review comment:
   How about `AllStates.find(_.id == id)`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


chia7712 commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582547180



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
##
@@ -25,8 +25,50 @@ import org.apache.kafka.common.record.RecordBatch
 
 import scala.collection.{immutable, mutable}
 
+
+object TransactionState {
+  val AllStates = Set(
+Empty,
+Ongoing,
+PrepareCommit,
+PrepareAbort,
+CompleteCommit,
+CompleteAbort,
+Dead,
+PrepareEpochFence
+  )
+
+  def fromName(name: String): Option[TransactionState] = {

Review comment:
   How about `AllStates.find(_.name == name)`?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


chia7712 commented on a change in pull request #10206:
URL: https://github.com/apache/kafka/pull/10206#discussion_r582545612



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/ListTransactionsRequest.java
##
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.requests;
+
+import org.apache.kafka.common.message.ListTransactionsRequestData;
+import org.apache.kafka.common.message.ListTransactionsResponseData;
+import org.apache.kafka.common.protocol.ApiKeys;
+import org.apache.kafka.common.protocol.ByteBufferAccessor;
+import org.apache.kafka.common.protocol.Errors;
+
+import java.nio.ByteBuffer;
+
+public class ListTransactionsRequest extends AbstractRequest {
+public static class Builder extends 
AbstractRequest.Builder {
+public final ListTransactionsRequestData data;
+
+public Builder(ListTransactionsRequestData data) {
+super(ApiKeys.LIST_TRANSACTIONS);
+this.data = data;
+}
+
+@Override
+public ListTransactionsRequest build(short version) {
+return new ListTransactionsRequest(data, version);
+}
+
+@Override
+public String toString() {
+return data.toString();
+}
+}
+
+private final ListTransactionsRequestData data;
+
+private ListTransactionsRequest(ListTransactionsRequestData data, short 
version) {
+super(ApiKeys.LIST_TRANSACTIONS, version);
+this.data = data;
+}
+
+public ListTransactionsRequestData data() {
+return data;
+}
+
+@Override
+public ListTransactionsResponse getErrorResponse(int throttleTimeMs, 
Throwable e) {
+Errors error = Errors.forException(e);
+ListTransactionsResponseData response = new 
ListTransactionsResponseData()

Review comment:
   `throttleTimeMs` is neglected.

##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionStateManager.scala
##
@@ -223,6 +224,46 @@ class TransactionStateManager(brokerId: Int,
   throw new IllegalStateException(s"Unexpected empty transaction metadata 
returned while putting $txnMetadata")))
   }
 
+  def listTransactionStates(
+filterProducerIds: Set[Long],
+filterStateNames: Set[String]
+  ): Either[Errors, List[ListTransactionsResponseData.TransactionState]] = {
+inReadLock(stateLock) {
+  if (loadingPartitions.nonEmpty) {
+Left(Errors.COORDINATOR_LOAD_IN_PROGRESS)
+  } else {
+val filterStates = filterStateNames.flatMap(TransactionState.fromName)
+val states = 
mutable.ListBuffer.empty[ListTransactionsResponseData.TransactionState]
+
+def shouldInclude(txnMetadata: TransactionMetadata): Boolean = {
+  if (txnMetadata.state == Dead) {
+false
+  } else if (filterProducerIds.nonEmpty && 
!filterProducerIds.contains(txnMetadata.producerId)) {
+false
+  } else if (filterStateNames.nonEmpty && 
!filterStates.contains(txnMetadata.state)) {
+false
+  } else {
+true
+  }
+}
+
+transactionMetadataCache.foreach { case (_, cache) =>
+  cache.metadataPerTransactionalId.values.foreach { txnMetadata =>
+txnMetadata.inLock {
+  if (shouldInclude(txnMetadata)) {
+states += new ListTransactionsResponseData.TransactionState()
+  .setTransactionalId(txnMetadata.transactionalId)
+  .setProducerId(txnMetadata.producerId)
+  .setTransactionState(txnMetadata.state.toString)

Review comment:
   `txnMetadata.state.toString` -> `txnMetadata.state.name`

##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionMetadata.scala
##
@@ -25,8 +25,50 @@ import org.apache.kafka.common.record.RecordBatch
 
 import scala.collection.{immutable, mutable}
 
+
+object TransactionState {
+  val AllStates = Set(
+Empty,
+Ongoing,
+PrepareCommit,
+PrepareAbort,
+CompleteCommit,
+CompleteAbort,
+Dead,
+PrepareEpochFence
+  )
+
+  def fromName(name: String): Option[TransactionState] = {

Review comment

[GitHub] [kafka] omkreddy edited a comment on pull request #10059: KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer

2021-02-24 Thread GitBox


omkreddy edited a comment on pull request #10059:
URL: https://github.com/apache/kafka/pull/10059#issuecomment-785604829


   @dpoldrugo Thanks for the PR. LGTM.  As @hachikuji  mentioned, we should 
update  `SaslAuthenticatorFailureNoDelayTest`, `SaslAuthenticatorTest` tests.  
I think, we may not need config, as we are already avoiding DNS lookup for SSL.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #10059: KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer

2021-02-24 Thread GitBox


omkreddy commented on pull request #10059:
URL: https://github.com/apache/kafka/pull/10059#issuecomment-785604829


   @dpoldrugo Thanks for the PR. LGTM.  As @hachikuji  mentioned, we should 
update  `SaslAuthenticatorFailureNoDelayTest`, `SaslAuthenticatorTest` tests.  
I think, we may not need config, as we are already avoiding DNS look up for SSL 
authentication.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #10162: DOCS: Update protocol doc for missing data type

2021-02-24 Thread GitBox


chia7712 commented on a change in pull request #10162:
URL: https://github.com/apache/kafka/pull/10162#discussion_r582529627



##
File path: clients/src/main/resources/common/message/README.md
##
@@ -75,16 +75,24 @@ There are several primitive field types available.
 
 * "int16": a 16-bit integer.
 
+* "uint16": a 16-bit unsigned integer.
+
 * "int32": a 32-bit integer.
 
+* "uint32": a 32-bit unsigned integer.
+
 * "int64": a 64-bit integer.
 
 * "float64": is a double-precision floating point number (IEEE 754).
 
 * "string": a UTF-8 string.
 
+* "uuid": a type 4 immutable universally unique identifier.
+
 * "bytes": binary data.
 
+* "records": memory record set or file record set.

Review comment:
   There are many sub classes for this type as the java type used by KRPC 
is `BaseRecords`. We do make protocol data carry different sub class in 
processing data (before serialization). Hence, it seems to me listing sub 
classes in this document is a bit weird.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] twmb commented on pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

2021-02-24 Thread GitBox


twmb commented on pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#issuecomment-785551804


   Also is there a reason not to include the `LastUpdateTimestamp`? It seems 
pretty valuable to know how long ago the transaction state was modified.
   
   I'm not sure how valuable LastProducerId or LastProducerEpoch would be, but 
I know those are available too, although (I think) those are in-memory only 
values used only for allowing retries of transactional calls.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] twmb edited a comment on pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

2021-02-24 Thread GitBox


twmb edited a comment on pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#issuecomment-785548579


   Can the KIP be updated to change TransactionState from an int8 to a string 
(as implemented in this PR)?
   
   Alternatively, is there value in using a string vs. an int8? The string is 
more descriptive, but since the states are a defined enum, it'd be smaller to 
serialize/deserialize the int8 and then a client can perform the int8 => string 
conversion.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r582491373



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, 
OffsetAndEpoch snapshotId) throws
 
 return Optional.of(new SnapshotPath(path, new 
OffsetAndEpoch(endOffset, epoch), partial));
 }
+
+/**
+ * Delete this snapshot from the filesystem.
+ */
+public static CompletableFuture deleteSnapshotIfExists(Path 
logDir, OffsetAndEpoch snapshotId) throws IOException {
+Path path = snapshotPath(logDir, snapshotId);
+Path destination = Snapshots.deleteRename(path, snapshotId);
+Utils.atomicMoveWithFallback(path, destination);
+
+return CompletableFuture.supplyAsync(() -> {

Review comment:
   Probably not since the reference would be through the file descriptor. 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r582471506



##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -299,7 +302,16 @@ final class KafkaMetadataLog private (
   // If snapshotIds contains a snapshot id, the KafkaRaftClient and 
Listener can expect that the snapshot exists
   // on the file system, so we should first remove snapshotId and then 
delete snapshot file.
   expiredSnapshotIdsIter.remove()
-  Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId)
+
+  val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+  val destination = Snapshots.deleteRename(path, snapshotId)
+  try {
+Utils.atomicMoveWithFallback(path, destination)
+  } catch {
+case e: IOException =>
+  warn("Error renaming snapshot file: " + path + " to :" + destination 
+ ", " + e.getMessage)

Review comment:
   nit: Could this be error level? Can we use `$` substitutions? Also, 
let's include the full exception instead of just the message.

##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -341,7 +342,7 @@ object KafkaMetadataLog {
 }
   }
 
-val replicatedLog = new KafkaMetadataLog(log, snapshotIds, topicPartition, 
maxFetchSizeInBytes)
+val replicatedLog = new KafkaMetadataLog(log, scheduler, snapshotIds, 
topicPartition, maxFetchSizeInBytes)

Review comment:
   Not something we need to solve here, but I think we should put some 
thought into consolidating file management. Right now it's a little awkward to 
divide responsibility between `KafkaMetadataLog` and `Log`. For example, I 
think we are trying to say that `KafkaMetadataLog` is responsible for 
snapshots. That is mostly true, but we are relying on `Log.loadSegmentFiles` 
for the deletion of orphaned `.delete` snapshots.

##
File path: core/src/main/scala/kafka/raft/KafkaMetadataLog.scala
##
@@ -299,7 +290,16 @@ final class KafkaMetadataLog private (
   // If snapshotIds contains a snapshot id, the KafkaRaftClient and 
Listener can expect that the snapshot exists
   // on the file system, so we should first remove snapshotId and then 
delete snapshot file.
   expiredSnapshotIdsIter.remove()
-  Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId)
+
+  val path = Snapshots.snapshotPath(log.dir.toPath, snapshotId)
+  val destination = Snapshots.deleteRename(path, snapshotId)
+  try {
+Utils.atomicMoveWithFallback(path, destination)
+  } catch {
+case e: IOException =>
+  warn("Error renaming snapshot file: " + path + " to :" + destination 
+ ", " + e.getMessage)
+  }
+  scheduler.schedule("delete-snapshot-file", () => 
Snapshots.deleteSnapshotIfExists(log.dir.toPath, snapshotId))

Review comment:
   Similar to `Log.deleteSegmentFiles`, we should probably use a delay 
here. I think it would be ok to either hard-code this to the default of 60s, or 
use `file.delete.delay.ms` from the default configuration.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] twmb commented on pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

2021-02-24 Thread GitBox


twmb commented on pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#issuecomment-785548579


   Can the KIP be updated to change TransactionState from an int8 to a string 
(as implemented in this PR)?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

2021-02-24 Thread GitBox


dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r582479381



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, 
OffsetAndEpoch snapshotId) throws
 
 return Optional.of(new SnapshotPath(path, new 
OffsetAndEpoch(endOffset, epoch), partial));
 }
+
+/**
+ * Delete this snapshot from the filesystem.
+ */
+public static CompletableFuture deleteSnapshotIfExists(Path 
logDir, OffsetAndEpoch snapshotId) throws IOException {
+Path path = snapshotPath(logDir, snapshotId);
+Path destination = Snapshots.deleteRename(path, snapshotId);
+Utils.atomicMoveWithFallback(path, destination);
+
+return CompletableFuture.supplyAsync(() -> {

Review comment:
   If the file is deleted while Kafka is still reading the file, won't the 
reading fail since we have already renamed the log file?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #10021: KAFKA-12205: Delete snapshots less than the snapshot at the log start

2021-02-24 Thread GitBox


dengziming commented on a change in pull request #10021:
URL: https://github.com/apache/kafka/pull/10021#discussion_r582477733



##
File path: raft/src/main/java/org/apache/kafka/snapshot/Snapshots.java
##
@@ -92,4 +100,22 @@ public static Path createTempFile(Path logDir, 
OffsetAndEpoch snapshotId) throws
 
 return Optional.of(new SnapshotPath(path, new 
OffsetAndEpoch(endOffset, epoch), partial));
 }
+
+/**
+ * Delete this snapshot from the filesystem.
+ */
+public static CompletableFuture deleteSnapshotIfExists(Path 
logDir, OffsetAndEpoch snapshotId) throws IOException {
+Path path = snapshotPath(logDir, snapshotId);
+Path destination = Snapshots.deleteRename(path, snapshotId);
+Utils.atomicMoveWithFallback(path, destination);
+
+return CompletableFuture.supplyAsync(() -> {
+try {
+return Files.deleteIfExists(destination);
+} catch (IOException e) {
+throw new RuntimeException("Error deleting snapshot file " + 
destination + ":" + e.getMessage());
+}
+});

Review comment:
   Added schedule to `KafkaMetadataLog`, PTAL. Do you think we should add a 
`MetadataLogSnapshot` class instead of using `OffsetAndEpoch` to represent a 
snapshot?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji opened a new pull request #10206: KAFKA-12369; Implement `ListTransactions` API

2021-02-24 Thread GitBox


hachikuji opened a new pull request #10206:
URL: https://github.com/apache/kafka/pull/10206


   This patch implements the `ListTransactions` API as documented in KIP-664: 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-664%3A+Provide+tooling+to+detect+and+abort+hanging+transactions.
 This is only the server-side implementation and does not contain the `Admin` 
API.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12371) MirrorMaker 2.0 documentation is incorrect

2021-02-24 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12371?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290598#comment-17290598
 ] 

Luke Chen commented on KAFKA-12371:
---

[~Scott-kirk], thanks for reporting the issue. I've fixed this issue in 
KAFKA-12350. You should be able to view the correct one after the kafka-site PR 
merged. Thank you.

> MirrorMaker 2.0 documentation is incorrect
> --
>
> Key: KAFKA-12371
> URL: https://issues.apache.org/jira/browse/KAFKA-12371
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs, documentation
>Affects Versions: 2.7.0
>Reporter: Scott Kirkpatrick
>Priority: Minor
>
> There are a few places in the official MirrorMaker 2.0 docs that are either 
> confusing or incorrect. Here are a few examples I've found:
> The documentation for the 'sync.group.offsets.enabled' config states that 
> it's enabled by default 
> [here|https://github.com/apache/kafka-site/blob/61f4707381c369a98a7a77e1a7c3a11d5983909c/27/ops.html#L802],
>  but the actual source code indicates that it's disabled by default 
> [here|https://github.com/apache/kafka/blob/f75efb96fae99a22eb54b5d0ef4e23b28fe8cd2d/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java#L185].
>  I'm unsure if the intent is to have it enabled or disabled by default.
> There are also some numerical typos, 
> [here|https://github.com/apache/kafka-site/blob/61f4707381c369a98a7a77e1a7c3a11d5983909c/27/ops.html#L791]
>  and 
> [here|https://github.com/apache/kafka-site/blob/61f4707381c369a98a7a77e1a7c3a11d5983909c/27/ops.html#L793].
>  These lines state that the default is 6000 seconds (and incorrectly that 
> it's equal to 10 minutes), but the actual default is 600 seconds, shown 
> [here|https://github.com/apache/kafka/blob/f75efb96fae99a22eb54b5d0ef4e23b28fe8cd2d/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java#L145]
>  and 
> [here|https://github.com/apache/kafka/blob/f75efb96fae99a22eb54b5d0ef4e23b28fe8cd2d/connect/mirror/src/main/java/org/apache/kafka/connect/mirror/MirrorConnectorConfig.java#L152]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming commented on a change in pull request #10162: DOCS: Update protocol doc for missing data type

2021-02-24 Thread GitBox


dengziming commented on a change in pull request #10162:
URL: https://github.com/apache/kafka/pull/10162#discussion_r582432795



##
File path: clients/src/main/resources/common/message/README.md
##
@@ -75,16 +75,24 @@ There are several primitive field types available.
 
 * "int16": a 16-bit integer.
 
+* "uint16": a 16-bit unsigned integer.
+
 * "int32": a 32-bit integer.
 
+* "uint32": a 32-bit unsigned integer.
+
 * "int64": a 64-bit integer.
 
 * "float64": is a double-precision floating point number (IEEE 754).
 
 * "string": a UTF-8 string.
 
+* "uuid": a type 4 immutable universally unique identifier.
+
 * "bytes": binary data.
 
+* "records": memory record set or file record set.

Review comment:
   Only `MemoryRecords` are supported when receiving records, but we can 
also set `FileRecords` when writing records. foe example 
`KafkaRaftClient.tryCompleteFetchRequest()` will call 
`partitionData.setRecordSet(log.read(xxx))`





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #10186: MINOR: bump release version to 3.0.0-SNAPSHOT

2021-02-24 Thread GitBox


mjsax merged pull request #10186:
URL: https://github.com/apache/kafka/pull/10186


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10205: KAFKA-12323 Follow-up: Refactor the unit test a bit

2021-02-24 Thread GitBox


guozhangwang commented on a change in pull request #10205:
URL: https://github.com/apache/kafka/pull/10205#discussion_r582428237



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##
@@ -1936,7 +1934,7 @@ public void process(final Object key, final Object value) 
{
 clientSupplier.consumer.addRecord(new ConsumerRecord<>(
 topic1,
 1,
-0L,
+100L,

Review comment:
   Ack.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10205: KAFKA-12323 Follow-up: Refactor the unit test a bit

2021-02-24 Thread GitBox


guozhangwang commented on a change in pull request #10205:
URL: https://github.com/apache/kafka/pull/10205#discussion_r582426740



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##
@@ -1844,19 +1844,17 @@ public void process(final Object key, final Object 
value) {}
 assertEquals(0, punctuatedWallClockTime.size());
 
 mockTime.sleep(100L);
-for (long i = 0L; i < 10L; i++) {

Review comment:
   Since we do not need it really: a single record is sufficient to trigger 
the punctuation.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] highluck commented on a change in pull request #9851: KAFKA-10769 Remove JoinGroupRequest#containsValidPattern as it is dup…

2021-02-24 Thread GitBox


highluck commented on a change in pull request #9851:
URL: https://github.com/apache/kafka/pull/9851#discussion_r582425104



##
File path: 
clients/src/main/java/org/apache/kafka/common/requests/JoinGroupRequest.java
##
@@ -59,38 +59,12 @@ public String toString() {
 public static final int UNKNOWN_GENERATION_ID = -1;
 public static final String UNKNOWN_PROTOCOL_NAME = "";
 
-private static final int MAX_GROUP_INSTANCE_ID_LENGTH = 249;
-
 /**
  * Ported from class Topic in {@link org.apache.kafka.common.internals} to 
restrict the charset for
  * static member id.
  */
 public static void validateGroupInstanceId(String id) {
-if (id.equals(""))
-throw new InvalidConfigurationException("Group instance id must be 
non-empty string");

Review comment:
   
   I modified code thanks!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-02-24 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r582415502



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -701,11 +719,17 @@ private[log] class Cleaner(val id: Int,
   // if any messages are to be retained, write them out
   val outputBuffer = result.outputBuffer
   if (outputBuffer.position() > 0) {
+if (destSegment.isEmpty) {
+  // create a new segment with a suffix appended to the name of the 
log and indexes
+  destSegment = Some(LogCleaner.createNewCleanedSegment(log, 
result.minOffset()))
+  transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex)

Review comment:
   It may be possible to call 
`transactionMetadata.appendTransactionIndex()` here though to append a little 
bit sooner.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-02-24 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r582413737



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -701,11 +719,17 @@ private[log] class Cleaner(val id: Int,
   // if any messages are to be retained, write them out
   val outputBuffer = result.outputBuffer
   if (outputBuffer.position() > 0) {
+if (destSegment.isEmpty) {
+  // create a new segment with a suffix appended to the name of the 
log and indexes
+  destSegment = Some(LogCleaner.createNewCleanedSegment(log, 
result.minOffset()))
+  transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex)

Review comment:
   Re: changing transaction code
   Since cleanedIndex is not created until here, we need to delay appends. 
Originally it was created at the start of `cleanSegments`. In that method, we 
now can not guarantee the log has been created until `cleanInto` returns with a 
defined `cleanedSegment`. In `cleanInto` there is a chance to call 
`onControlBatchRead` which previously just appeneded transactions, but can no 
longer do so if `cleanedIndex` is not defined.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-02-24 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r582413737



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -701,11 +719,17 @@ private[log] class Cleaner(val id: Int,
   // if any messages are to be retained, write them out
   val outputBuffer = result.outputBuffer
   if (outputBuffer.position() > 0) {
+if (destSegment.isEmpty) {
+  // create a new segment with a suffix appended to the name of the 
log and indexes
+  destSegment = Some(LogCleaner.createNewCleanedSegment(log, 
result.minOffset()))
+  transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex)

Review comment:
   Re: changing transaction code
   Since cleanedIndex is not created until here, we need to delay appends. 
Originally it was created at the start of `cleanSegments`. In that method, we 
now can not guarantee the log has been created until `cleanInto` returns with a 
defined `cleanedSegment`. In `cleanSegments` there is a chance to call 
`onControlBatchRead` which previously just appeneded transactions, but can no 
longer do so if `cleanedIndex` is not defined.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-02-24 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r582413737



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -701,11 +719,17 @@ private[log] class Cleaner(val id: Int,
   // if any messages are to be retained, write them out
   val outputBuffer = result.outputBuffer
   if (outputBuffer.position() > 0) {
+if (destSegment.isEmpty) {
+  // create a new segment with a suffix appended to the name of the 
log and indexes
+  destSegment = Some(LogCleaner.createNewCleanedSegment(log, 
result.minOffset()))
+  transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex)

Review comment:
   Re: changing transaction code
   Since cleanedIndex is not created until here, we need to delay appends. 
Originally it was created at the start of `cleanSegments`. In that method, we 
now can not guarantee the log has been created until `cleanInto` returns with a 
defined `cleanedSegment`. In `cleanSegments` there is a chance to call 
`onControlBatchRead` which previously just appeneded transactions, but can no 
longer do so.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-02-24 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r582413737



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -701,11 +719,17 @@ private[log] class Cleaner(val id: Int,
   // if any messages are to be retained, write them out
   val outputBuffer = result.outputBuffer
   if (outputBuffer.position() > 0) {
+if (destSegment.isEmpty) {
+  // create a new segment with a suffix appended to the name of the 
log and indexes
+  destSegment = Some(LogCleaner.createNewCleanedSegment(log, 
result.minOffset()))
+  transactionMetadata.cleanedIndex = Some(destSegment.get.txnIndex)

Review comment:
   Re: changing transaction code
   Since cleanedIndex is not created until here, we need to delay appends. 
Originally it was created at the start of `cleanSegments`. In that method, we 
can not guarantee the log has been created until `cleanInto` returns with a 
defined `cleanedSegment`. In `cleanSegments` there is a chance to call 
`onControlBatchRead` which previously just appeneded transactions, but can no 
longer do so.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-02-24 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r582407274



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -599,21 +606,30 @@ private[log] class Cleaner(val id: Int,
 }
 currentSegmentOpt = nextSegmentOpt
   }
-
-  cleaned.onBecomeInactiveSegment()
-  // flush new segment to disk before swap
-  cleaned.flush()
-
-  // update the modification date to retain the last modified date of the 
original files
-  val modified = segments.last.lastModified
-  cleaned.lastModified = modified
-
-  // swap in new segment
-  info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in 
log $log")
-  log.replaceSegments(List(cleaned), segments)
+  
+  // Result of cleaning included at least one record.
+  if (cleanedSegment.isDefined) {
+val cleaned = cleanedSegment.get
+cleaned.onBecomeInactiveSegment()
+// flush new segment to disk before swap
+cleaned.flush()
+
+// update the modification date to retain the last modified date of 
the original files
+val modified = segments.last.lastModified
+cleaned.lastModified = modified
+
+// swap in new segment
+info(s"Swapping in cleaned segment $cleaned for segment(s) $segments 
in log $log")
+log.replaceSegments(List(cleaned), segments)

Review comment:
   As for the first part about `replaceSegments`. It seems that there are a 
few other times we call this method. I'm wondering if we would want to update 
the logStartOffset in these cases (probably) and what the reason should be. I'm 
also wondering if we should include the reason as a parameter to 
`replaceSegments`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-02-24 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r582406089



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -599,21 +606,30 @@ private[log] class Cleaner(val id: Int,
 }
 currentSegmentOpt = nextSegmentOpt
   }
-
-  cleaned.onBecomeInactiveSegment()
-  // flush new segment to disk before swap
-  cleaned.flush()
-
-  // update the modification date to retain the last modified date of the 
original files
-  val modified = segments.last.lastModified
-  cleaned.lastModified = modified
-
-  // swap in new segment
-  info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in 
log $log")
-  log.replaceSegments(List(cleaned), segments)
+  
+  // Result of cleaning included at least one record.
+  if (cleanedSegment.isDefined) {
+val cleaned = cleanedSegment.get
+cleaned.onBecomeInactiveSegment()
+// flush new segment to disk before swap
+cleaned.flush()
+
+// update the modification date to retain the last modified date of 
the original files
+val modified = segments.last.lastModified
+cleaned.lastModified = modified
+
+// swap in new segment
+info(s"Swapping in cleaned segment $cleaned for segment(s) $segments 
in log $log")
+log.replaceSegments(List(cleaned), segments)

Review comment:
   Right. I wasn't sure if it made sense to change the deleteSegments code 
to support the SegmentCompaction reason. The entire segment is being deleted 
here, but I see how this may be confusing when reading in the log.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12211) NoSuchFileException will be thrown if hasPersistentStores is false when creating stateDir

2021-02-24 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12211?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman resolved KAFKA-12211.

Resolution: Fixed

> NoSuchFileException will be thrown if hasPersistentStores is false when 
> creating stateDir
> -
>
> Key: KAFKA-12211
> URL: https://issues.apache.org/jira/browse/KAFKA-12211
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.7.0, 2.6.1
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Minor
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> We improved the state directory folder/file permission setting in 
> KAFKA-10705. But we forgot to consider one situation: if user doesn't have 
> PersistentStores, we won't need to create base dir and state dir. And if 
> there's no such dir/file, and we tried to set permission to them, we'll have 
> *NoSuchFileException*
>  
> {code:java}
> ERROR Error changing permissions for the state or base directory 
> /var/folders/4l/393lkmzx3zvftwynjngzdsfwgn/T/kafka-11254487964259813330/appId_AdjustStreamThreadCountTestshouldAddStreamThread
>   (org.apache.kafka.streams.processor.internals.StateDirectory:117)
> 2021-01-15T16:10:44.570+0800 [DEBUG] [TestEventLogger] 
> java.nio.file.NoSuchFileException: 
> /var/folders/4l/393lkmzx3zvftwynjngzdsfwgn/T/kafka-11254487964259813330/appId_AdjustStreamThreadCountTestshouldAddStreamThread
> 2021-01-15T16:10:44.570+0800 [DEBUG] [TestEventLogger]  at 
> java.base/sun.nio.fs.UnixException.translateToIOException(UnixException.java:92)
> 2021-01-15T16:10:44.570+0800 [DEBUG] [TestEventLogger]  at 
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:111)
> 2021-01-15T16:10:44.570+0800 [DEBUG] [TestEventLogger]  at 
> java.base/sun.nio.fs.UnixException.rethrowAsIOException(UnixException.java:116)
> 2021-01-15T16:10:44.570+0800 [DEBUG] [TestEventLogger]  at 
> java.base/sun.nio.fs.UnixFileAttributeViews$Posix.setMode(UnixFileAttributeViews.java:254)
> 2021-01-15T16:10:44.570+0800 [DEBUG] [TestEventLogger]  at 
> java.base/sun.nio.fs.UnixFileAttributeViews$Posix.setPermissions(UnixFileAttributeViews.java:276)
> 2021-01-15T16:10:44.570+0800 [DEBUG] [TestEventLogger]  at 
> java.base/java.nio.file.Files.setPosixFilePermissions(Files.java:2079)
> 2021-01-15T16:10:44.570+0800 [DEBUG] [TestEventLogger]  at 
> org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:115)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-02-24 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r582402100



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -582,13 +586,16 @@ private[log] class Cleaner(val id: Int,
 transactionMetadata.addAbortedTransactions(abortedTransactions)
 
 val retainDeletesAndTxnMarkers = currentSegment.lastModified > 
deleteHorizonMs
-info(s"Cleaning $currentSegment in log ${log.name} into 
${cleaned.baseOffset} " +
+info(s"Cleaning $currentSegment in log ${log.name} into 
${currentSegment.baseOffset} " +

Review comment:
   Hmmm I think I didn't really understand this log at first. It does make 
sense to move it to when we know what the new segment is. I think the issue I 
had was what to do with including `retainDeletesAndTxnMarkers`. Maybe it makes 
sense to have a log saying the current segment and the deletion info and a 
second log to say what the new segment's baseOffset is later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-02-24 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r582402100



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -582,13 +586,16 @@ private[log] class Cleaner(val id: Int,
 transactionMetadata.addAbortedTransactions(abortedTransactions)
 
 val retainDeletesAndTxnMarkers = currentSegment.lastModified > 
deleteHorizonMs
-info(s"Cleaning $currentSegment in log ${log.name} into 
${cleaned.baseOffset} " +
+info(s"Cleaning $currentSegment in log ${log.name} into 
${currentSegment.baseOffset} " +

Review comment:
   Hmmm I think I didn't really understand this log at first. It does make 
sense to move it to when we know what the new segment is. I think the issue I 
had was what to do with `retainDeletesAndTxnMarkers`. Maybe it makes sense to 
have a log saying the current segment and the deletion info and a second log to 
say what the new segment's baseOffset is later.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r582378783



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
 synchronized (this) {
 // First we need to make sure we snapshot everything in exactly 
the current state. This
 // means both the current set of messages we're still waiting to 
finish, stored in this
-// class, which setting flushing = true will handle by storing any 
new values into a new
+// class, which setting recordFlushPending = true will handle by 
storing any new values into a new
 // buffer; and the current set of user-specified offsets, stored 
in the
 // OffsetStorageWriter, for which we can use beginFlush() to 
initiate the snapshot.
-flushing = true;
-boolean flushStarted = offsetWriter.beginFlush();
+// No need to begin a new offset flush if we timed out waiting for 
records to be flushed to
+// Kafka in a prior attempt.
+if (!recordFlushPending) {

Review comment:
   > (Please correct me if I'm wrong on this point; my core knowledge is a 
little fuzzy and maybe there are stronger guarantees than I'm aware of) 
Out-of-order acknowledgment of records makes tracking the latest offset for a 
given source partition a little less trivial than it seems initially. For 
example, if a task produces two records with the same source partition that end 
up being delivered to different topic-partitions, the second record may be 
ack'd before the first, and when it comes time for offset commit, the framework 
would have to refrain from committing offsets for that second record until the 
first is also ack'd.
   
   Ok, that rings a bell. I think I see how the logic works now and I don't see 
an obvious way to make it simpler. Doing something finer-grained as you said 
might be the way to go. Anyway, I agree this is something to save for a 
follow-up improvement.
   
   > I think it's a necessary evil, since source task offset commits are 
conducted on a single thread. Without a timeout for offset commits, a single 
task could block indefinitely and disable offset commits for all other tasks on 
the cluster.
   
   Hmm.. This is suspicious. Why do we need to block the executor while we wait 
for the flush? Would it be simpler to let the worker source task finish the 
flush and the offset commit in its own event thread? We end up blocking the 
event thread anyway because of the need to do it under the lock.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Closed] (KAFKA-10810) Add a replace thread option to the streams uncaught exception handler

2021-02-24 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10810?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson closed KAFKA-10810.
--

> Add a replace thread option to the streams uncaught exception handler  
> ---
>
> Key: KAFKA-10810
> URL: https://issues.apache.org/jira/browse/KAFKA-10810
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Walker Carlson
>Assignee: Walker Carlson
>Priority: Major
>  Labels: streams
> Fix For: 2.8.0
>
>
> Add an option to replace threads that have died.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-9331) Add option to terminate application when StreamThread(s) die

2021-02-24 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9331?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson closed KAFKA-9331.
-

> Add option to terminate application when StreamThread(s) die
> 
>
> Key: KAFKA-9331
> URL: https://issues.apache.org/jira/browse/KAFKA-9331
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Michael Bingham
>Assignee: Walker Carlson
>Priority: Minor
>  Labels: needs-kip
>
> Currently, if a {{StreamThread}} dies due to an unexpected exception, the 
> Streams application continues running. Even if all {{StreamThread}}(s) die, 
> the application will continue running, but will be in an {{ERROR}} state. 
> Many users want or expect the application to terminate in the event of a 
> fatal exception that kills one or more {{StreamThread}}(s). Currently, this 
> requires extra work from the developer to register an uncaught exception 
> handler on the {{KafkaStreams}} object and trigger a shutdown as needed.
> It would be useful to provide a configurable option for the Streams 
> application to have it automatically terminate with an exception if one or 
> more {{StreamThread}}(s) die.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-4748) Need a way to shutdown all workers in a Streams application at the same time

2021-02-24 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-4748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson closed KAFKA-4748.
-

> Need a way to shutdown all workers in a Streams application at the same time
> 
>
> Key: KAFKA-4748
> URL: https://issues.apache.org/jira/browse/KAFKA-4748
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.10.1.1
>Reporter: Elias Levy
>Assignee: Walker Carlson
>Priority: Major
> Fix For: 2.8.0
>
>
> If you have a fleet of Stream workers for an application and attempt to shut 
> them down simultaneously (e.g. via SIGTERM and 
> Runtime.getRuntime().addShutdownHook() and streams.close())), a large number 
> of the workers fail to shutdown.
> The problem appears to be a race condition between the shutdown signal and 
> the consumer rebalancing that is triggered by some of the workers existing 
> before others.  Apparently, workers that receive the signal later fail to 
> exit apparently as they are caught in the rebalance.
> Terminating workers in a rolling fashion is not advisable in some situations. 
>  The rolling shutdown will result in many unnecessary rebalances and may 
> fail, as the application may have large amount of local state that a smaller 
> number of nodes may not be able to store.
> It would appear that there is a need for a protocol change to allow the 
> coordinator to signal a consumer group to shutdown without leading to 
> rebalancing.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-6943) Have option to shutdown KS cleanly if any threads crashes, or if all threads crash

2021-02-24 Thread Walker Carlson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6943?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Walker Carlson closed KAFKA-6943.
-

> Have option to shutdown KS cleanly if any threads crashes, or if all threads 
> crash
> --
>
> Key: KAFKA-6943
> URL: https://issues.apache.org/jira/browse/KAFKA-6943
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 1.1.0
>Reporter: Antony Stubbs
>Assignee: Walker Carlson
>Priority: Major
>  Labels: user-experience
> Fix For: 2.8.0
>
>
> ATM users have to implement this themselves. Might be nice to have an option 
> to configure that if all threads crash, or if any crash, to initiate clean 
> shutdown.
> This also has a gotcha where atm if you call KS#close without a timeout, from 
> the uncaught exception handler, you dead lock.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-02-24 Thread GitBox


jolshan commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r582363641



##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -1094,6 +1119,8 @@ private[log] class CleanedTransactionMetadata {
 
   // Output cleaned index to write retained aborted transactions
   var cleanedIndex: Option[TransactionIndex] = None
+  
+  var toAppend: List[AbortedTxn] = List.empty

Review comment:
   I believe the issue is that this code expects the log to be created 
already but it is not. I think my intention was to delay this operation until 
the log is created. I'll double check this is actually the case.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9590: KAFKA-7556: KafkaConsumer.beginningOffsets does not return actual first offsets

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #9590:
URL: https://github.com/apache/kafka/pull/9590#discussion_r582343277



##
File path: 
clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
##
@@ -198,7 +198,7 @@ private static FilterResult filterTo(TopicPartition 
partition, Iterable 
deleteHorizonMs
-info(s"Cleaning $currentSegment in log ${log.name} into 
${cleaned.baseOffset} " +
+info(s"Cleaning $currentSegment in log ${log.name} into 
${currentSegment.baseOffset} " +
   s"with deletion horizon $deleteHorizonMs, " +
   s"${if(retainDeletesAndTxnMarkers) "retaining" else "discarding"} 
deletes.")
 
 try {
-  cleanInto(log.topicPartition, currentSegment.log, cleaned, map, 
retainDeletesAndTxnMarkers, log.config.maxMessageSize,
+   cleanedSegment = cleanInto(log, log.topicPartition, 
currentSegment.log, cleanedSegment, map, retainDeletesAndTxnMarkers, 
log.config.maxMessageSize,

Review comment:
   nit: it shouldn't be necessary to pass both `log` and 
`log.topicPartition`.

##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -582,13 +586,16 @@ private[log] class Cleaner(val id: Int,
 transactionMetadata.addAbortedTransactions(abortedTransactions)
 
 val retainDeletesAndTxnMarkers = currentSegment.lastModified > 
deleteHorizonMs
-info(s"Cleaning $currentSegment in log ${log.name} into 
${cleaned.baseOffset} " +
+info(s"Cleaning $currentSegment in log ${log.name} into 
${currentSegment.baseOffset} " +

Review comment:
   This log message no longer makes sense. Is the intent to use 
`cleanedSegment` instead of `currentSegment`? Perhaps we should consider moving 
this log line to after the cleaning when we know what the new segment is.

##
File path: core/src/main/scala/kafka/log/LogCleaner.scala
##
@@ -599,21 +606,30 @@ private[log] class Cleaner(val id: Int,
 }
 currentSegmentOpt = nextSegmentOpt
   }
-
-  cleaned.onBecomeInactiveSegment()
-  // flush new segment to disk before swap
-  cleaned.flush()
-
-  // update the modification date to retain the last modified date of the 
original files
-  val modified = segments.last.lastModified
-  cleaned.lastModified = modified
-
-  // swap in new segment
-  info(s"Swapping in cleaned segment $cleaned for segment(s) $segments in 
log $log")
-  log.replaceSegments(List(cleaned), segments)
+  
+  // Result of cleaning included at least one record.
+  if (cleanedSegment.isDefined) {
+val cleaned = cleanedSegment.get
+cleaned.onBecomeInactiveSegment()
+// flush new segment to disk before swap
+cleaned.flush()
+
+// update the modification date to retain the last modified date of 
the original files
+val modified = segments.last.lastModified
+cleaned.lastModified = modified
+
+// swap in new segment
+info(s"Swapping in cleaned segment $cleaned for segment(s) $segments 
in log $log")
+log.replaceSegments(List(cleaned), segments)

Review comment:
   Would it make more sense to update log start offset in `replaceSegments` 
when we are protected by the lock? Otherwise, it seems like the log start 
offset will be out of sync with the segments until all of them have been 
cleaned and `maybeIncrementLogStartOffset` is called. 
   
   Note that `deleteSegments` does update the log start offset. Interestingly, 
`SegmentDeletion` is used for the reason in this case, and not 
`SegmentCompaction`.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10847) Avoid spurious left/outer join results in stream-stream join

2021-02-24 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290502#comment-17290502
 ] 

Sergio Peña commented on KAFKA-10847:
-

[~mjsax] Just did a quick test when deleting the emitted the noj-joined record. 
It didn't affect too much the performance. I will keep this deletion if it 
causes a problem when rebalance happens.

> Avoid spurious left/outer join results in stream-stream join 
> -
>
> Key: KAFKA-10847
> URL: https://issues.apache.org/jira/browse/KAFKA-10847
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sergio Peña
>Priority: Major
>
> KafkaStreams follows an eager execution model, ie, it never buffers input 
> records but processes them right away. For left/outer stream-stream join, 
> this implies that left/outer join result might be emitted before the window 
> end (or window close) time is reached. Thus, a record what will be an 
> inner-join result, might produce a eager (and spurious) left/outer join 
> result.
> We should change the implementation of the join, to not emit eager left/outer 
> join result, but instead delay the emission of such result after the window 
> grace period passed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

2021-02-24 Thread GitBox


C0urante commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r582351074



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
 synchronized (this) {
 // First we need to make sure we snapshot everything in exactly 
the current state. This
 // means both the current set of messages we're still waiting to 
finish, stored in this
-// class, which setting flushing = true will handle by storing any 
new values into a new
+// class, which setting recordFlushPending = true will handle by 
storing any new values into a new
 // buffer; and the current set of user-specified offsets, stored 
in the
 // OffsetStorageWriter, for which we can use beginFlush() to 
initiate the snapshot.
-flushing = true;
-boolean flushStarted = offsetWriter.beginFlush();
+// No need to begin a new offset flush if we timed out waiting for 
records to be flushed to
+// Kafka in a prior attempt.
+if (!recordFlushPending) {

Review comment:
   1. I think it's a necessary evil, since source task offset commits are 
[conducted on a single 
thread](https://github.com/apache/kafka/blob/3f09fb97b6943c0612488dfa8e5eab8078fd7ca0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L64).
 Without a timeout for offset commits, a single task could block indefinitely 
and disable offset commits for all other tasks on the cluster.
   2. This is definitely possible; I think the only saving grace here is that 
the combined sizes of the `outstandingMessages` and 
`outstandingMessagesBacklog` fields is going to be naturally throttled by the 
producer's buffer. If too many records are accumulated, the call to 
`Producer::send` will block synchronously until space is freed up, at which 
point, the worker can continue polling the task for new records. This isn't 
ideal as it will essentially cause the producer's entire buffer to be occupied 
until the throughput of record production from the task decreases and/or the 
write throughput of the producer rises to meet it, but it at least establishes 
an upper bound for how large a single batch of records in the 
`oustandingMessages` field ever gets. It may take several offset commit 
attempts for all of the records in that batch to be ack'd, with all but the 
last (successful) attempt timing out and failing, but forward progress with 
offset commits should still be possible.
   
   I share your feelings about the complexity here. I think ultimately it 
arises from two constraints:
   1. A worker-global producer is used to write source offsets to the internal 
offsets topic right now. Although this doesn't necessarily require the 
single-threaded logic for offset commits mentioned above, things become simpler 
with it.
   2. (Please correct me if I'm wrong on this point; my core knowledge is a 
little fuzzy and maybe there are stronger guarantees than I'm aware of) 
Out-of-order acknowledgment of records makes tracking the latest offset for a 
given source partition a little less trivial than it seems initially. For 
example, if a task produces two records with the same source partition that end 
up being delivered to different topic-partitions, the second record may be 
ack'd before the first, and when it comes time for offset commit, the framework 
would have to refrain from committing offsets for that second record until the 
first is also ack'd.
   
   I don't think either of these points make it impossible to add even 
more-fine-grained offset commit behavior and/or remove offset commit timeouts, 
but the work involved would be a fair amount heavier than this relatively-minor 
patch. If you'd prefer to see something along those lines, could we consider 
merging this patch for the moment and perform a more serious overhaul of the 
source task offset commit logic as a follow-up, possibly with a small design 
discussion on a Jira ticket to make sure there's alignment on the new behavior?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

2021-02-24 Thread GitBox


C0urante commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r582351074



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
 synchronized (this) {
 // First we need to make sure we snapshot everything in exactly 
the current state. This
 // means both the current set of messages we're still waiting to 
finish, stored in this
-// class, which setting flushing = true will handle by storing any 
new values into a new
+// class, which setting recordFlushPending = true will handle by 
storing any new values into a new
 // buffer; and the current set of user-specified offsets, stored 
in the
 // OffsetStorageWriter, for which we can use beginFlush() to 
initiate the snapshot.
-flushing = true;
-boolean flushStarted = offsetWriter.beginFlush();
+// No need to begin a new offset flush if we timed out waiting for 
records to be flushed to
+// Kafka in a prior attempt.
+if (!recordFlushPending) {

Review comment:
   1. I think it's a necessary evil, since source task offset commits are 
[conducted on a single 
thread](https://github.com/apache/kafka/blob/3f09fb97b6943c0612488dfa8e5eab8078fd7ca0/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java#L64).
 Without a timeout for offset commits, a single task could block indefinitely 
and disable offset commits for all other tasks on the cluster.
   2. This is definitely possible; I think the only saving grace here is that 
the combined sizes of the `outstandingMessages` and 
`outstandingMessagesBacklog` fields is going to be naturally throttled by the 
producer's buffer. If too many records are accumulated, the call to 
`Producer::send` will block synchronously until space is freed up, at which 
point, the worker can continue polling the task for new records. This isn't 
ideal as it will essentially cause the producer's entire buffer to be occupied 
until the throughput of record production from the task decreases and/or the 
write throughput of the producer rises to meet it, but it at least establishes 
an upper bound for how large a single batch of records in the 
`oustandingMessages` field ever gets. It may take several offset commit 
attempts for all of the records in that batch to be ack'd, with all but the 
last (successful) attempt timing out and failing, but forward progress with 
offset commits should still be possible.
   
   I share your feelings about the complexity here. I think ultimately it 
arises from two constraints:
   1. A worker-global producer is used to write source offsets to the internal 
offsets topic right now. Although this doesn't necessarily require the 
single-threaded logic for offset commits mentioned above, things become simpler 
with it.
   2. (Please correct me if I'm wrong on this point; my core knowledge is a 
little fuzzy and maybe there are stronger guarantees than I'm aware of) 
Out-of-order acknowledgment of records makes tracking the latest offset for a 
given source partition a little less trivial than it seems initially. For 
example, if a task produces two records with the same source partition that end 
up being delivered to different topic-partitions, the second record may be 
ack'd before the first, and when it comes time for offset commit, the framework 
would have to refrain from committing offsets for that second record until the 
first is also ack'd.
   
   I don't think either of these points make it impossible to add even 
more-fine-grained offset commit behavior and/or remove offset commit timeouts, 
but the work involved would be a fair amount heavier than this relatively-minor 
patch. If you'd prefer to see something along those lines, could we merge this 
patch for the moment and perform a more serious overhaul of the source task 
offset commit logic as a follow-up, possibly with a small design discussion on 
a Jira ticket to make sure there's alignment on the new behavior?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12374) Add missing config sasl.mechanism.controller.protocol

2021-02-24 Thread Ron Dagostino (Jira)
Ron Dagostino created KAFKA-12374:
-

 Summary: Add missing config sasl.mechanism.controller.protocol
 Key: KAFKA-12374
 URL: https://issues.apache.org/jira/browse/KAFKA-12374
 Project: Kafka
  Issue Type: Bug
  Components: config
Affects Versions: 2.8.0
Reporter: Ron Dagostino
Assignee: Ron Dagostino
 Fix For: 2.8.0


The config `sasl.mechanism.controller.protocol` from KIP-631 is not 
implemented.  Furthermore, `KafkaRaftManager` is using inter-broker security 
information when it connects to the Raft controller quorum.  KafkaRaftClient 
should use the first entry in `controller.listener.names` to determine the 
listener name; that listener name's mapped value in the 
`listener.security.protocol.map` (if such a mapping exists, otherwise the 
listener name itself) for the security protocol; and the value of 
`sasl.mechanism.controller.protocol` for the SASL mechanism.  Finally, 
`RaftControllerNodeProvider` needs to use the value of 
`sasl.mechanism.controller.protocol` instead of the inter-broker sasl mechanism 
(it currently determines the listener name and security protocol correctly)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax commented on a change in pull request #10205: KAFKA-12323 Follow-up: Refactor the unit test a bit

2021-02-24 Thread GitBox


mjsax commented on a change in pull request #10205:
URL: https://github.com/apache/kafka/pull/10205#discussion_r582349190



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##
@@ -1844,19 +1844,17 @@ public void process(final Object key, final Object 
value) {}
 assertEquals(0, punctuatedWallClockTime.size());
 
 mockTime.sleep(100L);
-for (long i = 0L; i < 10L; i++) {

Review comment:
   Why do we remove this loop?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##
@@ -1936,7 +1934,7 @@ public void process(final Object key, final Object value) 
{
 clientSupplier.consumer.addRecord(new ConsumerRecord<>(
 topic1,
 1,
-0L,
+100L,

Review comment:
   nit: can we pick a different value to disambiguate (wall-clock time is 
set to `100L` above)?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10112: KAFKA-12226: Prevent source task offset failure when producer is overwhelmed

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #10112:
URL: https://github.com/apache/kafka/pull/10112#discussion_r582259574



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -98,7 +98,8 @@
 private IdentityHashMap, 
ProducerRecord> outstandingMessages;
 // A second buffer is used while an offset flush is running
 private IdentityHashMap, 
ProducerRecord> outstandingMessagesBacklog;
-private boolean flushing;
+private boolean recordFlushPending;
+private boolean offsetFlushPending;
 private CountDownLatch stopRequestedLatch;

Review comment:
   nit: while we're at it, this could be `final`

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -475,11 +476,15 @@ public boolean commitOffsets() {
 synchronized (this) {
 // First we need to make sure we snapshot everything in exactly 
the current state. This
 // means both the current set of messages we're still waiting to 
finish, stored in this
-// class, which setting flushing = true will handle by storing any 
new values into a new
+// class, which setting recordFlushPending = true will handle by 
storing any new values into a new
 // buffer; and the current set of user-specified offsets, stored 
in the
 // OffsetStorageWriter, for which we can use beginFlush() to 
initiate the snapshot.
-flushing = true;
-boolean flushStarted = offsetWriter.beginFlush();
+// No need to begin a new offset flush if we timed out waiting for 
records to be flushed to
+// Kafka in a prior attempt.
+if (!recordFlushPending) {

Review comment:
   If I understand it correctly, the main difference in this patch is that 
we no longer fail the flush if the messages cannot be drained quickly enough 
from `outstandingMessages`.  A few questions come to mind:
   
   1. Is the flush timeout still a useful configuration? Was it ever? Even if 
we timeout, we still have to wait for the records that were sent to the 
producer.
   2. While we are waiting for `outstandingMessages` to be drained, we are 
still accumulating messages in `outstandingMessagesBacklog`. I imagine we can 
get into a pattern here once we fill up the accumulator. While we're waiting 
for `outstandingMessages` to complete, we fill `outstandingMessagesBacklog`. 
Once the flush completes, `outstandingMessagesBacklog` becomes 
`outstandingMessages` and we are stuck waiting again. Could this prevent us 
from satisfying the commit interval?
   
   Overall, I can't shake the feeling that this logic is more complicated than 
necessary. Why do we need the concept of flushing at all? It would be more 
intuitive to just commit whatever the latest offsets are. Note that we do not 
use `outstandingMessages` for the purpose of retries. Once a request has been 
handed off to the producer successfully, we rely on the producer to handle 
retries. Any delivery failure after that is treated as fatal. So then does 
`oustandingMessages` serve any other purpose other than tracking flushing? I am 
probably missing something here. It has been a long time since I reviewed this 
logic.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang opened a new pull request #10205: KAFKA-12323 Follow-up: Refactor the unit test a bit

2021-02-24 Thread GitBox


guozhangwang opened a new pull request #10205:
URL: https://github.com/apache/kafka/pull/10205


   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-12267) Implement DescribeTransactions API

2021-02-24 Thread Jason Gustafson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12267?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jason Gustafson resolved KAFKA-12267.
-
Resolution: Fixed

> Implement DescribeTransactions API
> --
>
> Key: KAFKA-12267
> URL: https://issues.apache.org/jira/browse/KAFKA-12267
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Assignee: Jason Gustafson
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji merged pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

2021-02-24 Thread GitBox


hachikuji merged pull request #10183:
URL: https://github.com/apache/kafka/pull/10183


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #9990: KAFKA-12235: Fix ZkAdminManager.describeConfigs on 2+ config keys

2021-02-24 Thread GitBox


cmccabe commented on pull request #9990:
URL: https://github.com/apache/kafka/pull/9990#issuecomment-785357271


   ping @ivanyu  



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-24 Thread GitBox


cmccabe commented on pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#issuecomment-785354588


   rebased on trunk



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12373) Improve KafkaRaftClient handling of graceful shutdown

2021-02-24 Thread Jose Armando Garcia Sancio (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12373?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Jose Armando Garcia Sancio updated KAFKA-12373:
---
Description: 
The current implementation simply closes the metrics group when it is closed.

When closing the KafkaRaftClient that is the leader it should perform at least 
the following steps:
 # Stop accepting new schedule append operations
 # Append to the log the batches currently in the BatchAccumulator
 # Wait with a timeout for the high-watermark to reach the LEO
 # Cooperatively resign as leader from the quorum

  was:
The current implementation simply closes the metrics group when it is closed.

When closing the KafkaRaftClient that is the leader it should perform at least 
the following steps:
 # Stop accepting new schedule append operations
 # Append to the log batches currently in the BatchAccumulator
 # Wait with a timeout for the high-watermark to reach the LEO
 # Cooperatively resign as leader from the quorum


> Improve KafkaRaftClient handling of graceful shutdown
> -
>
> Key: KAFKA-12373
> URL: https://issues.apache.org/jira/browse/KAFKA-12373
> Project: Kafka
>  Issue Type: Sub-task
>  Components: replication
>Reporter: Jose Armando Garcia Sancio
>Priority: Major
>
> The current implementation simply closes the metrics group when it is closed.
> When closing the KafkaRaftClient that is the leader it should perform at 
> least the following steps:
>  # Stop accepting new schedule append operations
>  # Append to the log the batches currently in the BatchAccumulator
>  # Wait with a timeout for the high-watermark to reach the LEO
>  # Cooperatively resign as leader from the quorum



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on pull request #10204: MINOR: Fix the generation extraction util

2021-02-24 Thread GitBox


guozhangwang commented on pull request #10204:
URL: https://github.com/apache/kafka/pull/10204#issuecomment-785350477


   > Sorry that I didn't notice this issue :(
   
   No worries!! :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #10204: MINOR: Fix the generation extraction util

2021-02-24 Thread GitBox


guozhangwang merged pull request #10204:
URL: https://github.com/apache/kafka/pull/10204


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12373) Improve KafkaRaftClient handling of graceful shutdown

2021-02-24 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-12373:
--

 Summary: Improve KafkaRaftClient handling of graceful shutdown
 Key: KAFKA-12373
 URL: https://issues.apache.org/jira/browse/KAFKA-12373
 Project: Kafka
  Issue Type: Sub-task
  Components: replication
Reporter: Jose Armando Garcia Sancio


The current implementation simply closes the metrics group when it is closed.

When closing the KafkaRaftClient that is the leader it should perform at least 
the following steps:
 # Stop accepting new schedule append operations
 # Append to the log batches currently in the BatchAccumulator
 # Wait with a timeout for the high-watermark to reach the LEO
 # Cooperatively resign as leader from the quorum



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-24 Thread GitBox


ijuma commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582266116



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+if (!config.deleteTopicEnable) {
+  if (request.context.apiVersion() < 3) {
+throw new InvalidRequestException("Topic deletion is disabled.")
+  } else {
+throw new TopicDeletionDisabledException()
+  }
+}
+val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+val nameToId = new mutable.HashMap[String, Uuid]
+deleteTopicsRequest.data().topicNames().iterator().asScala.foreach {
+  name => nameToId.put(name, Uuid.ZERO_UUID)
+}
+deleteTopicsRequest.data().topics().iterator().asScala.foreach {
+  nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId())
+}
+val (describable, deletable)  = {
+  if (authHelper.authorize(request.context, DELETE, CLUSTER, 
CLUSTER_NAME)) {
+(nameToId.keySet, nameToId.keySet)
+  } else {
+val authorizedDescribeTopics: Set[String] = 
authHelper.filterByAuthorized(
+  request.context, DESCRIBE, TOPIC, nameToId.keys)(n => n)
+val authorizedDeleteTopics: Set[String] = 
authHelper.filterByAuthorized(
+  request.context, DELETE, TOPIC, nameToId.keys)(n => n)
+(authorizedDescribeTopics, authorizedDeleteTopics)
+  }
+}
+def sendResponse(response: DeleteTopicsResponseData): Unit = {
+  nameToId.keysIterator.foreach {

Review comment:
   There is no copy when using foreach (both Java or Scala versions). And 
Scala lambdas are automatically converted to Java lambdas.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12158) Consider better return type of RaftClient.scheduleAppend

2021-02-24 Thread Jose Armando Garcia Sancio (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12158?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290233#comment-17290233
 ] 

Jose Armando Garcia Sancio commented on KAFKA-12158:


Yes [~sagarrao] . Feel free to pick this up.

> Consider better return type of RaftClient.scheduleAppend
> 
>
> Key: KAFKA-12158
> URL: https://issues.apache.org/jira/browse/KAFKA-12158
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Jason Gustafson
>Priority: Major
>
> Currently `RaftClient` has the following Append API:
> {code}
> Long scheduleAppend(int epoch, List records);
> {code}
> There are a few possible cases that the single return value is trying to 
> handle:
> 1. The epoch doesn't match or we are not the current leader => return 
> Long.MaxValue
> 2. We failed to allocate memory to write the the batch (backpressure case) => 
> return null
> 3. We successfully scheduled the append => return the expected offset
> It might be better to define a richer type so that the cases that must be 
> handled are clearer. At a minimum, it would be better to return 
> `OptionalLong` and get rid of the null case.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-24 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582259112



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+if (!config.deleteTopicEnable) {
+  if (request.context.apiVersion() < 3) {
+throw new InvalidRequestException("Topic deletion is disabled.")
+  } else {
+throw new TopicDeletionDisabledException()
+  }
+}
+val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+val nameToId = new mutable.HashMap[String, Uuid]
+deleteTopicsRequest.data().topicNames().iterator().asScala.foreach {
+  name => nameToId.put(name, Uuid.ZERO_UUID)
+}
+deleteTopicsRequest.data().topics().iterator().asScala.foreach {
+  nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId())
+}
+val (describable, deletable)  = {
+  if (authHelper.authorize(request.context, DELETE, CLUSTER, 
CLUSTER_NAME)) {
+(nameToId.keySet, nameToId.keySet)
+  } else {
+val authorizedDescribeTopics: Set[String] = 
authHelper.filterByAuthorized(
+  request.context, DESCRIBE, TOPIC, nameToId.keys)(n => n)
+val authorizedDeleteTopics: Set[String] = 
authHelper.filterByAuthorized(
+  request.context, DELETE, TOPIC, nameToId.keys)(n => n)
+(authorizedDescribeTopics, authorizedDeleteTopics)
+  }
+}
+def sendResponse(response: DeleteTopicsResponseData): Unit = {
+  nameToId.keysIterator.foreach {

Review comment:
   the main rationale for this is that I don't want scala to perform any 
copying, and I know for sure that if I give an iterator that it won't.
   
   I wouldn't mind using Java's forEach directly but I couldn't find a good way 
to convert a scala closure to a java closure (maybe this is obvious and I 
missed it?)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-24 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582257154



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -541,6 +559,41 @@ static void validateNewTopicNames(Map 
topicErrors,
 return configChanges;
 }
 
+ControllerResult deleteTopics(Map 
nameToId) {
+DeleteTopicsResponseData result = new DeleteTopicsResponseData();
+List records = new ArrayList<>();
+for (Entry entry : nameToId.entrySet()) {
+ApiError error = deleteTopic(entry.getKey(), entry.getValue(), 
records);
+result.responses().add(new DeletableTopicResult().
+setName(entry.getKey()).
+setTopicId(entry.getValue()).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()));
+}
+return new ControllerResult<>(records, result);
+}
+
+ApiError deleteTopic(String name,
+ Uuid providedId,
+ List records) {
+Uuid realId = topicsByName.get(name);
+if (realId == null) {
+return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
+"Unable to locate the provided topic name.");
+}
+if (!providedId.equals(Uuid.ZERO_UUID) && !providedId.equals(realId)) {
+return new ApiError(UNKNOWN_TOPIC_ID,
+"The provided topic ID does not match the provided topic 
name.");
+}
+TopicControlInfo topic = topics.get(realId);
+if (topic == null) {
+return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "Unable to locate 
topic id.");

Review comment:
   the code has changed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-24 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582257154



##
File path: 
metadata/src/main/java/org/apache/kafka/controller/ReplicationControlManager.java
##
@@ -541,6 +559,41 @@ static void validateNewTopicNames(Map 
topicErrors,
 return configChanges;
 }
 
+ControllerResult deleteTopics(Map 
nameToId) {
+DeleteTopicsResponseData result = new DeleteTopicsResponseData();
+List records = new ArrayList<>();
+for (Entry entry : nameToId.entrySet()) {
+ApiError error = deleteTopic(entry.getKey(), entry.getValue(), 
records);
+result.responses().add(new DeletableTopicResult().
+setName(entry.getKey()).
+setTopicId(entry.getValue()).
+setErrorCode(error.error().code()).
+setErrorMessage(error.message()));
+}
+return new ControllerResult<>(records, result);
+}
+
+ApiError deleteTopic(String name,
+ Uuid providedId,
+ List records) {
+Uuid realId = topicsByName.get(name);
+if (realId == null) {
+return new ApiError(UNKNOWN_TOPIC_OR_PARTITION,
+"Unable to locate the provided topic name.");
+}
+if (!providedId.equals(Uuid.ZERO_UUID) && !providedId.equals(realId)) {
+return new ApiError(UNKNOWN_TOPIC_ID,
+"The provided topic ID does not match the provided topic 
name.");
+}
+TopicControlInfo topic = topics.get(realId);
+if (topic == null) {
+return new ApiError(UNKNOWN_TOPIC_OR_PARTITION, "Unable to locate 
topic id.");

Review comment:
   this is out of date





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-24 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582256432



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+if (!config.deleteTopicEnable) {
+  if (request.context.apiVersion() < 3) {
+throw new InvalidRequestException("Topic deletion is disabled.")

Review comment:
   As @ijuma said, in this case the error message gives extra information 
which would be very helpful to users. If we always wanted the same message, 
there would be no reason to have the string field in the wire protocol.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-24 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582256432



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+if (!config.deleteTopicEnable) {
+  if (request.context.apiVersion() < 3) {
+throw new InvalidRequestException("Topic deletion is disabled.")

Review comment:
   The error message should reflect the problem, right?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-24 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582255681



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+if (!config.deleteTopicEnable) {
+  if (request.context.apiVersion() < 3) {
+throw new InvalidRequestException("Topic deletion is disabled.")
+  } else {
+throw new TopicDeletionDisabledException()
+  }
+}
+val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+val nameToId = new mutable.HashMap[String, Uuid]
+deleteTopicsRequest.data().topicNames().iterator().asScala.foreach {
+  name => nameToId.put(name, Uuid.ZERO_UUID)
+}
+deleteTopicsRequest.data().topics().iterator().asScala.foreach {
+  nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId())
+}
+val (describable, deletable)  = {
+  if (authHelper.authorize(request.context, DELETE, CLUSTER, 
CLUSTER_NAME)) {
+(nameToId.keySet, nameToId.keySet)
+  } else {
+val authorizedDescribeTopics: Set[String] = 
authHelper.filterByAuthorized(
+  request.context, DESCRIBE, TOPIC, nameToId.keys)(n => n)
+val authorizedDeleteTopics: Set[String] = 
authHelper.filterByAuthorized(
+  request.context, DELETE, TOPIC, nameToId.keys)(n => n)
+(authorizedDescribeTopics, authorizedDeleteTopics)
+  }
+}
+def sendResponse(response: DeleteTopicsResponseData): Unit = {
+  nameToId.keysIterator.foreach {
+name => if (!deletable.contains(name)) {
+  val result = if (describable.contains(name)) {
+new 
DeletableTopicResult().setName(name).setErrorCode(TOPIC_AUTHORIZATION_FAILED.code)
+  } else {
+new 
DeletableTopicResult().setName(name).setErrorCode(UNKNOWN_TOPIC_OR_PARTITION.code)
+  }
+  response.responses().add(result)
+}
+  }
+  requestHelper.sendResponseMaybeThrottle(request, throttleTimeMs => {
+response.setThrottleTimeMs(throttleTimeMs)
+new DeleteTopicsResponse(response)
+  })
+}
+   val future = controller.deleteTopics(
+ nameToId.view.filterKeys(deletable.contains(_)).toMap.asJava)

Review comment:
   this has changed





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #10184: MINOR: enable topic deletion in the KIP-500 controller

2021-02-24 Thread GitBox


cmccabe commented on a change in pull request #10184:
URL: https://github.com/apache/kafka/pull/10184#discussion_r582255514



##
File path: core/src/main/scala/kafka/server/ControllerApis.scala
##
@@ -195,6 +198,61 @@ class ControllerApis(val requestChannel: RequestChannel,
   requestThrottleMs => createResponseCallback(requestThrottleMs))
   }
 
+  def handleDeleteTopics(request: RequestChannel.Request): Unit = {
+if (!config.deleteTopicEnable) {
+  if (request.context.apiVersion() < 3) {
+throw new InvalidRequestException("Topic deletion is disabled.")
+  } else {
+throw new TopicDeletionDisabledException()
+  }
+}
+val deleteTopicsRequest = request.body[DeleteTopicsRequest]
+val nameToId = new mutable.HashMap[String, Uuid]
+deleteTopicsRequest.data().topicNames().iterator().asScala.foreach {
+  name => nameToId.put(name, Uuid.ZERO_UUID)
+}
+deleteTopicsRequest.data().topics().iterator().asScala.foreach {
+  nameAndId => nameToId.put(nameAndId.name(), nameAndId.topicId())
+}
+val (describable, deletable)  = {
+  if (authHelper.authorize(request.context, DELETE, CLUSTER, 
CLUSTER_NAME)) {
+(nameToId.keySet, nameToId.keySet)
+  } else {
+val authorizedDescribeTopics: Set[String] = 
authHelper.filterByAuthorized(

Review comment:
   thanks, this is a good point.  I fixed the code to accept only topic 
IDs.  I also found a few more bugs, and added a unit test for the deletion code 
in controller apis.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on pull request #10059: KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer

2021-02-24 Thread GitBox


hachikuji commented on pull request #10059:
URL: https://github.com/apache/kafka/pull/10059#issuecomment-785326212


   I think the test failures are caused by this: 
https://github.com/apache/kafka/blob/trunk/clients/src/test/java/org/apache/kafka/common/security/authenticator/SaslAuthenticatorFailureDelayTest.java#L222.
 We are relying on the reverse DNS to map "127.0.0.1" to "localhost." If we use 
"localhost" directly, then the tests pass.
   
   It seems reasonable to me to change the test case. I cannot think of a 
reason why this would change the reasoning that is documented under 
`ChannelBuilderUtils.peerHost`, which seems to apply just as well to SASL_SSL 
as SSL.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #10051: Adding documentation for KIP-614

2021-02-24 Thread GitBox


vvcephei commented on pull request #10051:
URL: https://github.com/apache/kafka/pull/10051#issuecomment-785322634


   Merged to 2.8



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #10051: Adding documentation for KIP-614

2021-02-24 Thread GitBox


vvcephei merged pull request #10051:
URL: https://github.com/apache/kafka/pull/10051


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #10051: Adding documentation for KIP-614

2021-02-24 Thread GitBox


vvcephei commented on pull request #10051:
URL: https://github.com/apache/kafka/pull/10051#issuecomment-785321000


   Fixed the merge conflict.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12372) Enhance TimestampCoverter to handle multiple timestamp or date fields

2021-02-24 Thread Hoa Le (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12372?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hoa Le updated KAFKA-12372:
---
Summary: Enhance TimestampCoverter to handle multiple timestamp or date 
fields  (was: Enhance TimestampCoverter to handle fields schema name to convert 
to Timestamp or Date)

> Enhance TimestampCoverter to handle multiple timestamp or date fields
> -
>
> Key: KAFKA-12372
> URL: https://issues.apache.org/jira/browse/KAFKA-12372
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Affects Versions: 2.7.0
>Reporter: Hoa Le
>Priority: Minor
> Fix For: 3.0.0
>
>
> Hi team,
>  
> Our team is having an issue of handling multiple timestamp fields in a kafka 
> message, so for now if we use the 
> [converter|https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java]
>  then we have to add more fields in the config.
> We can implement it in a generic way to check if the field.schema().name() is 
> timestamp or date then we can convert it.
> Please let me know if I can contribute to this.
>  
> Thanks,
> Hoa.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-12372) Enhance TimestampCoverter to handle fields schema name to convert to Timestamp or Date

2021-02-24 Thread Hoa Le (Jira)
Hoa Le created KAFKA-12372:
--

 Summary: Enhance TimestampCoverter to handle fields schema name to 
convert to Timestamp or Date
 Key: KAFKA-12372
 URL: https://issues.apache.org/jira/browse/KAFKA-12372
 Project: Kafka
  Issue Type: Improvement
  Components: KafkaConnect
Affects Versions: 2.7.0
Reporter: Hoa Le
 Fix For: 3.0.0


Hi team,

 

Our team is having an issue of handling multiple timestamp fields in a kafka 
message, so for now if we use the 
[converter|https://github.com/apache/kafka/blob/trunk/connect/transforms/src/main/java/org/apache/kafka/connect/transforms/TimestampConverter.java]
 then we have to add more fields in the config.

We can implement it in a generic way to check if the field.schema().name() is 
timestamp or date then we can convert it.

Please let me know if I can contribute to this.

 

Thanks,

Hoa.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #10059: KAFKA-8562: SaslChannelBuilder - avoid (reverse) DNS lookup while building underlying SslTransportLayer

2021-02-24 Thread GitBox


hachikuji commented on pull request #10059:
URL: https://github.com/apache/kafka/pull/10059#issuecomment-785315990


   Hmm.. I may have been too hasty with my approval. We need to investigate the 
test failures. I've triggered another build.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rondagostino commented on pull request #10199: MINOR: Fix security_test system test for Raft case

2021-02-24 Thread GitBox


rondagostino commented on pull request #10199:
URL: https://github.com/apache/kafka/pull/10199#issuecomment-785311895


   @mumrah I separated out the communication-to-quorum failure cases into a 
separate test -- so now we confirm that hostname verification failure to both 
ZK and Raft Controller causes Kafka to be unable to start.
   
   I did not add the test where communication to the Raft quorum is okay but 
the inter-broker protocol is SSL and failing due to hostname verification.  Let 
me know if you think it is necessary and I can add it.
   
   @abbccdda I "fixed" the ZooKeeper test as mentioned above -- we can revert 
if you decide you need to fix something related to the auto topic creation 
manager instead.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#discussion_r582219028



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##
@@ -255,6 +256,47 @@ class TransactionCoordinator(brokerId: Int,
 }
   }
 
+  def handleDescribeTransactions(
+transactionalId: String
+  ): DescribeTransactionsResponseData.TransactionState = {
+val transactionState = new 
DescribeTransactionsResponseData.TransactionState()
+  .setTransactionalId(transactionalId)
+
+if (!isActive.get()) {

Review comment:
   That's a fair point. The difference is that none of the other APIs are 
batched. I thought it was simpler to let the coordinator API work only with 
individual transactionalIds and leave the batching aspect to `KafkaApis`. The 
downside of having this potentially redundant check seems not too bad.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #10183: KAFKA-12267; Implement `DescribeTransactions` API

2021-02-24 Thread GitBox


hachikuji commented on a change in pull request #10183:
URL: https://github.com/apache/kafka/pull/10183#discussion_r582213713



##
File path: 
core/src/main/scala/kafka/coordinator/transaction/TransactionCoordinator.scala
##
@@ -255,6 +256,47 @@ class TransactionCoordinator(brokerId: Int,
 }
   }
 
+  def handleDescribeTransactions(
+transactionalId: String
+  ): DescribeTransactionsResponseData.TransactionState = {
+val transactionState = new 
DescribeTransactionsResponseData.TransactionState()
+  .setTransactionalId(transactionalId)
+
+if (!isActive.get()) {
+  transactionState.setErrorCode(Errors.COORDINATOR_NOT_AVAILABLE.code)
+} else if (transactionalId == null || transactionalId.isEmpty) {

Review comment:
   I agree it seems inconsistent. The protocol definition does not allow it 
to be nullable, but I think I added the check because the method is one step 
removed from the request. Perhaps I can change it to raise 
`IllegalArgumentException` if the `transactionalId` is null.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10170: KAFKA-12323: Set timestamp in record context when punctuate

2021-02-24 Thread GitBox


mjsax commented on a change in pull request #10170:
URL: https://github.com/apache/kafka/pull/10170#discussion_r582202700



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##
@@ -1874,6 +1873,85 @@ public void close() {}
 assertEquals(2, punctuatedWallClockTime.size());
 }
 
+@Test
+public void shouldPunctuateWithTimestampPreservedInProcessorContext() {
+final org.apache.kafka.streams.kstream.TransformerSupplier> punctuateProcessor =
+() -> new org.apache.kafka.streams.kstream.Transformer>() {
+@Override
+public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
+context.schedule(Duration.ofMillis(100L), 
PunctuationType.WALL_CLOCK_TIME, timestamp -> context.forward("key", "value"));
+context.schedule(Duration.ofMillis(100L), 
PunctuationType.STREAM_TIME, timestamp -> context.forward("key", "value"));
+}
+
+@Override
+public KeyValue transform(final Object key, 
final Object value) {
+return null;
+}
+
+@Override
+public void close() {}
+};
+
+final List peekedContextTime = new ArrayList<>();
+final org.apache.kafka.streams.processor.ProcessorSupplier peekProcessor =
+() -> new 
org.apache.kafka.streams.processor.AbstractProcessor() {
+@Override
+public void process(final Object key, final Object value) {
+peekedContextTime.add(context.timestamp());
+}
+};
+
+internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
+.transform(punctuateProcessor)
+.process(peekProcessor);
+internalStreamsBuilder.buildAndOptimizeTopology();
+
+final long currTime = mockTime.milliseconds();
+final StreamThread thread = createStreamThread(CLIENT_ID, config, 
false);
+
+thread.setState(StreamThread.State.STARTING);
+thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
+final List assignedPartitions = new ArrayList<>();
+
+final Map> activeTasks = new HashMap<>();
+
+// assign single partition
+assignedPartitions.add(t1p1);
+activeTasks.put(task1, Collections.singleton(t1p1));
+
+thread.taskManager().handleAssignment(activeTasks, emptyMap());
+
+clientSupplier.consumer.assign(assignedPartitions);
+
clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
+thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
+
+thread.runOnce();
+assertEquals(0, peekedContextTime.size());
+
+mockTime.sleep(100L);
+thread.runOnce();
+
+assertEquals(1, peekedContextTime.size());
+assertEquals(currTime + 100L, peekedContextTime.get(0).longValue());
+
+clientSupplier.consumer.addRecord(new ConsumerRecord<>(
+topic1,
+1,
+0L,
+ 100L,
+ TimestampType.CREATE_TIME,
+ ConsumerRecord.NULL_CHECKSUM,
+ "K".getBytes().length,
+ "V".getBytes().length,
+ "K".getBytes(),
+ "V".getBytes()));
+
+thread.runOnce();
+
+assertEquals(2, peekedContextTime.size());
+assertEquals(0L, peekedContextTime.get(1).longValue());

Review comment:
   So to make sure we actually use "stream-time" we should change the test 
to actually punctuation twice? Would you mind doing a follow-up PR? Might also 
be worth not use a different timestamp compare to "wall-clock time" to make 
sure we don't by accident pass in current wall-clock.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #10170: KAFKA-12323: Set timestamp in record context when punctuate

2021-02-24 Thread GitBox


mjsax commented on a change in pull request #10170:
URL: https://github.com/apache/kafka/pull/10170#discussion_r582202700



##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/StreamThreadTest.java
##
@@ -1874,6 +1873,85 @@ public void close() {}
 assertEquals(2, punctuatedWallClockTime.size());
 }
 
+@Test
+public void shouldPunctuateWithTimestampPreservedInProcessorContext() {
+final org.apache.kafka.streams.kstream.TransformerSupplier> punctuateProcessor =
+() -> new org.apache.kafka.streams.kstream.Transformer>() {
+@Override
+public void init(final 
org.apache.kafka.streams.processor.ProcessorContext context) {
+context.schedule(Duration.ofMillis(100L), 
PunctuationType.WALL_CLOCK_TIME, timestamp -> context.forward("key", "value"));
+context.schedule(Duration.ofMillis(100L), 
PunctuationType.STREAM_TIME, timestamp -> context.forward("key", "value"));
+}
+
+@Override
+public KeyValue transform(final Object key, 
final Object value) {
+return null;
+}
+
+@Override
+public void close() {}
+};
+
+final List peekedContextTime = new ArrayList<>();
+final org.apache.kafka.streams.processor.ProcessorSupplier peekProcessor =
+() -> new 
org.apache.kafka.streams.processor.AbstractProcessor() {
+@Override
+public void process(final Object key, final Object value) {
+peekedContextTime.add(context.timestamp());
+}
+};
+
+internalStreamsBuilder.stream(Collections.singleton(topic1), consumed)
+.transform(punctuateProcessor)
+.process(peekProcessor);
+internalStreamsBuilder.buildAndOptimizeTopology();
+
+final long currTime = mockTime.milliseconds();
+final StreamThread thread = createStreamThread(CLIENT_ID, config, 
false);
+
+thread.setState(StreamThread.State.STARTING);
+thread.rebalanceListener().onPartitionsRevoked(Collections.emptySet());
+final List assignedPartitions = new ArrayList<>();
+
+final Map> activeTasks = new HashMap<>();
+
+// assign single partition
+assignedPartitions.add(t1p1);
+activeTasks.put(task1, Collections.singleton(t1p1));
+
+thread.taskManager().handleAssignment(activeTasks, emptyMap());
+
+clientSupplier.consumer.assign(assignedPartitions);
+
clientSupplier.consumer.updateBeginningOffsets(Collections.singletonMap(t1p1, 
0L));
+thread.rebalanceListener().onPartitionsAssigned(assignedPartitions);
+
+thread.runOnce();
+assertEquals(0, peekedContextTime.size());
+
+mockTime.sleep(100L);
+thread.runOnce();
+
+assertEquals(1, peekedContextTime.size());
+assertEquals(currTime + 100L, peekedContextTime.get(0).longValue());
+
+clientSupplier.consumer.addRecord(new ConsumerRecord<>(
+topic1,
+1,
+0L,
+ 100L,
+ TimestampType.CREATE_TIME,
+ ConsumerRecord.NULL_CHECKSUM,
+ "K".getBytes().length,
+ "V".getBytes().length,
+ "K".getBytes(),
+ "V".getBytes()));
+
+thread.runOnce();
+
+assertEquals(2, peekedContextTime.size());
+assertEquals(0L, peekedContextTime.get(1).longValue());

Review comment:
   So to make sure we actually use "stream-time" we should change the test 
to actually punctuation twice? Would you mind doing a follow-up PR?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10847) Avoid spurious left/outer join results in stream-stream join

2021-02-24 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10847?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290176#comment-17290176
 ] 

Matthias J. Sax commented on KAFKA-10847:
-

Thanks for the update!
{quote}I still need to know how to get the grace period. 
{quote}
I guess we will need to pass it into the `Processor` when creating it during 
`Topology` build time.
{quote}I do not delete any records when emitted.
{quote}
I assume you refer to the case when computing left/outer join results? I think 
we might need to delete though, to ensure we don't produce duplicates (eg, when 
a rebalance happens).
{quote}I don't know if bloom filters are active 
{quote}
They should be active – we enable them by default.

> Avoid spurious left/outer join results in stream-stream join 
> -
>
> Key: KAFKA-10847
> URL: https://issues.apache.org/jira/browse/KAFKA-10847
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Sergio Peña
>Priority: Major
>
> KafkaStreams follows an eager execution model, ie, it never buffers input 
> records but processes them right away. For left/outer stream-stream join, 
> this implies that left/outer join result might be emitted before the window 
> end (or window close) time is reached. Thus, a record what will be an 
> inner-join result, might produce a eager (and spurious) left/outer join 
> result.
> We should change the implementation of the join, to not emit eager left/outer 
> join result, but instead delay the emission of such result after the window 
> grace period passed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12360) Improve documentation of max.task.idle.ms (kafka-streams)

2021-02-24 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12360?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17290164#comment-17290164
 ] 

ASF GitHub Bot commented on KAFKA-12360:


mjsax commented on pull request #333:
URL: https://github.com/apache/kafka-site/pull/333#issuecomment-785274324


   @nicodds Thanks for the PR!
   
   We also need a second PR for https://github.com/apache/kafka that is the 
source of truth for the docs.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Improve documentation of max.task.idle.ms (kafka-streams)
> -
>
> Key: KAFKA-12360
> URL: https://issues.apache.org/jira/browse/KAFKA-12360
> Project: Kafka
>  Issue Type: Improvement
>  Components: docs, streams
>Reporter: Domenico Delle Side
>Priority: Minor
>  Labels: beginner, newbie, trivial
>
> _max.task.idle.ms_ is an handy way to pause processing in a *_kafka-streams_* 
> application. This is very useful when you need to join two topics that are 
> out of sync, i.e when data in a topic may be produced _before_ you receive 
> join information in the other topic.
> In the documentation, however, it is not specified that the value of 
> _max.task.idle.ms_ *must* be lower than _max.poll.intervall.ms_, otherwise 
> you'll incur into an endless rebalancing problem.
> I think it is better to clearly state this in the documentation.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


  1   2   >