[jira] [Commented] (KAFKA-4628) Support KTable/GlobalKTable Joins

2020-11-03 Thread Hartmut Armbruster (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4628?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17225245#comment-17225245
 ] 

Hartmut Armbruster commented on KAFKA-4628:
---

Many Thanks, so how about the concerns on narrow right domain?
E.g. for a KTable to KTable fkjoin 10.000.000...to...5 ?

> Support KTable/GlobalKTable Joins
> -
>
> Key: KAFKA-4628
> URL: https://issues.apache.org/jira/browse/KAFKA-4628
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.0
>Reporter: Damian Guy
>Priority: Major
>  Labels: needs-kip
>
> In KIP-99 we have added support for GlobalKTables, however we don't currently 
> support KTable/GlobalKTable joins as they require materializing a state store 
> for the join. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dengziming opened a new pull request #9546: MONOR: optimeze PartitionStateMachine handleStateChanges

2020-11-03 Thread GitBox


dengziming opened a new pull request #9546:
URL: https://github.com/apache/kafka/pull/9546


   The logic of state change to OfflinePartition or NonExistentPartition are 
identical, so just merge them.
   If fact, the logic of the case NewPartition is also the same, but have 
different log format, so ignore.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #7751: KAFKA-7987: Reinitialize ZookeeperClient after auth failures

2020-11-03 Thread GitBox


junrao commented on a change in pull request #7751:
URL: https://github.com/apache/kafka/pull/7751#discussion_r516157977



##
File path: core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
##
@@ -81,7 +85,8 @@ class ZooKeeperClient(connectString: String,
   private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, 
ZNodeChildChangeHandler]().asScala
   private val inFlightRequests = new Semaphore(maxInFlightRequests)
   private val stateChangeHandlers = new ConcurrentHashMap[String, 
StateChangeHandler]().asScala
-  private[zookeeper] val expiryScheduler = new KafkaScheduler(threads = 1, 
"zk-session-expiry-handler")
+  private[zookeeper] val reinitializeScheduler = new KafkaScheduler(threads = 
1, "zk-client-reinit-")

Review comment:
   This is an existing issue. Should we include ZooKeeperClient.name in the 
thread name to distinguish between different instances? If so, we probably need 
to replace all the spaces in name when using it as the thread name prefix.

##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerWithZkSaslTest.scala
##
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.net.InetAddress
+import java.util
+import java.util.UUID
+import java.util.concurrent.{Executors, TimeUnit}
+
+import javax.security.auth.Subject
+import javax.security.auth.callback.CallbackHandler
+import kafka.api.SaslSetup
+import kafka.security.authorizer.AclEntry.WildcardHost
+import kafka.server.KafkaConfig
+import kafka.utils.JaasTestUtils.{JaasModule, JaasSection}
+import kafka.utils.{JaasTestUtils, TestUtils}
+import kafka.zk.{KafkaZkClient, ZooKeeperTestHarness}
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBinding, AclBindingFilter}
+import org.apache.kafka.common.acl.AclOperation.{READ, WRITE}
+import org.apache.kafka.common.acl.AclPermissionType.ALLOW
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
+import org.apache.kafka.common.resource.PatternType.LITERAL
+import org.apache.kafka.common.resource.ResourcePattern
+import org.apache.kafka.common.resource.ResourceType.TOPIC
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.test.{TestUtils => JTestUtils}
+import org.apache.zookeeper.server.auth.DigestLoginModule
+import org.junit.Assert.assertEquals
+import org.junit.{After, Before, Test}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.Seq
+
+class AclAuthorizerWithZkSaslTest extends ZooKeeperTestHarness with SaslSetup {
+
+  private val aclAuthorizer = new AclAuthorizer
+  private val aclAuthorizer2 = new AclAuthorizer
+  private val resource: ResourcePattern = new ResourcePattern(TOPIC, "foo-" + 
UUID.randomUUID(), LITERAL)
+  private val username = "alice"
+  private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username)
+  private val requestContext = newRequestContext(principal, 
InetAddress.getByName("192.168.0.1"))
+  private val executor = Executors.newSingleThreadScheduledExecutor
+  private var config: KafkaConfig = _
+
+  @Before
+  override def setUp(): Unit = {
+// Allow failed clients to avoid server closing the connection before 
reporting AuthFailed.
+System.setProperty("zookeeper.allowSaslFailedClients", "true")
+
+// Configure ZK SASL with TestableDigestLoginModule for clients to inject 
failures
+TestableDigestLoginModule.reset()
+val jaasSections = JaasTestUtils.zkSections
+val serverJaas = jaasSections.filter(_.contextName == "Server")
+val clientJaas = jaasSections.filter(_.contextName == "Client")
+  .map(section => new TestableJaasSection(section.contextName, 
section.modules))
+startSasl(serverJaas ++ clientJaas)
+
+// Increase maxUpdateRetries to avoid transient failures
+aclAuthorizer.maxUpdateRetries = Int.MaxValue
+aclAuthorizer2.maxUpdateRetries = Int.MaxValue
+
+super.setUp()
+config = KafkaConfig.fromProps(Tes

[GitHub] [kafka] hachikuji commented on a change in pull request #9539: KAFKA-10634: Adding LeaderId to Voters list in LeaderChangeMessage

2020-11-03 Thread GitBox


hachikuji commented on a change in pull request #9539:
URL: https://github.com/apache/kafka/pull/9539#discussion_r516215666



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -317,6 +317,9 @@ private void appendLeaderChangeMessage(LeaderState state, 
long currentTimeMs) {
 .map(follower -> new Voter().setVoterId(follower))
 .collect(Collectors.toList());
 
+// Adding the leader to the voters as the protocol ensures that leader 
always votes for itself.
+voters.add(new Voter().setVoterId(state.election().leaderId()));

Review comment:
   I think the original KIP stated that the LeaderChange message would 
encode the set of voters that had voted for the leader. We thought this might 
be useful for debugging. Later on, we had a change of heart and decided it 
would just be the set of voters. Now I'm thinking it might be useful to have 
both. The log will always remember who the voters were at the time of the 
election and which voters had granted the leader's candidacy, which could be 
helpful in case of misconfigurations.
   
   For the set of voters which voted for the current leader, I think what we 
want is `CandidateState.grantingVoters`. However, by the time `onBecomeLeader` 
is fired, we have already dropped the `CandidateState`. One option is to carry 
`grantingVoters` over to `LeaderState`. We might also be able to pass it 
through `onBecomeLeader`. This will be easier if we get rid of the call to 
`onBecomeLeader` in `initialize()`. Following KAFKA-10527, it is not possible 
to initialize as a leader, so we could raise an exception instead.
   
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9433: KAFKA-10607: Consistent behaviour for response errorCounts()

2020-11-03 Thread GitBox


tombentley commented on pull request #9433:
URL: https://github.com/apache/kafka/pull/9433#issuecomment-720421263


   @chia7712 fixed, thanks.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #5346: Kafka 6432: make index lookup more cache friendly

2020-11-03 Thread GitBox


junrao commented on a change in pull request #5346:
URL: https://github.com/apache/kafka/pull/5346#discussion_r516177126



##
File path: core/src/main/scala/kafka/log/AbstractIndex.scala
##
@@ -44,9 +44,67 @@ abstract class AbstractIndex[K, V](@volatile var file: File, 
val baseOffset: Lon
   // Length of the index file
   @volatile
   private var _length: Long = _
-
   protected def entrySize: Int
 
+  /*
+   Kafka mmaps index files into memory, and all the read / write operations of 
the index is through OS page cache. This
+   avoids blocked disk I/O in most cases.
+
+   To the extent of our knowledge, all the modern operating systems use LRU 
policy or its variants to manage page
+   cache. Kafka always appends to the end of the index file, and almost all 
the index lookups (typically from in-sync
+   followers or consumers) are very close to the end of the index. So, the LRU 
cache replacement policy should work very
+   well with Kafka's index access pattern.
+
+   However, when looking up index, the standard binary search algorithm is not 
cache friendly, and can cause unnecessary
+   page faults (the thread is blocked to wait for reading some index entries 
from hard disk, as those entries are not
+   cached in the page cache).
+
+   For example, in an index with 13 pages, to lookup an entry in the last page 
(page #12), the standard binary search
+   algorithm will read index entries in page #0, 6, 9, 11, and 12.
+   page number: |0|1|2|3|4|5|6|7|8|9|10|11|12 |
+   steps:   |1| | | | | |3| | |4|  |5 |2/6|
+   In each page, there are hundreds log entries, corresponding to hundreds to 
thousands of kafka messages. When the
+   index gradually growing from the 1st entry in page #12 to the last entry in 
page #12, all the write (append)
+   operations are in page #12, and all the in-sync follower / consumer lookups 
read page #0,6,9,11,12. As these pages
+   are always used in each in-sync lookup, we can assume these pages are 
fairly recently used, and are very likely to be
+   in the page cache. When the index grows to page #13, the pages needed in a 
in-sync lookup change to #0, 7, 10, 12,
+   and 13:
+   page number: |0|1|2|3|4|5|6|7|8|9|10|11|12|13 |
+   steps:   |1| | | | | | |3| | | 4|5 | 6|2/7|
+   Page #7 and page #10 have not been used for a very long time. They are much 
less likely to be in the page cache, than
+   the other pages. The 1st lookup, after the 1st index entry in page #13 is 
appended, is likely to have to read page #7
+   and page #10 from disk (page fault), which can take up to more than a 
second. In our test, this can cause the
+   at-least-once produce latency to jump to about 1 second from a few ms.
+
+   Here, we use a more cache-friendly lookup algorithm:
+   if (target > indexEntry[end - N]) // if the target is in the last N entries 
of the index
+  binarySearch(end - N, end)
+   else
+  binarySearch(begin, end - N)
+
+   If possible, we only look up in the last N entries of the index. By 
choosing a proper constant N, all the in-sync
+   lookups should go to the 1st branch. We call the last N entries the "warm" 
section. As we frequently look up in this
+   relatively small section, the pages containing this section are more likely 
to be in the page cache.
+
+   We set N (_warmEntries) to 8192, because
+   1. This number is small enough to guarantee all the pages of the "warm" 
section is touched in every warm-section
+  lookup. So that, the entire warm section is really "warm".
+  When doing warm-section lookup, following 3 entries are always touched: 
indexEntry(end), indexEntry(end-N),
+  and indexEntry((end*2 -N)/2). If page size >= 4096, all the warm-section 
pages (3 or fewer) are touched, when we
+  touch those 3 entries. As of 2018, 4096 is the smallest page size for 
all the processors (x86-32, x86-64, MIPS,
+  SPARC, Power, ARM etc.).
+   2. This number is large enough to guarantee most of the in-sync lookups are 
in the warm-section. With default Kafka
+  settings, 8KB index corresponds to about 4MB (offset index) or 2.7MB 
(time index) log messages.

Review comment:
   An offset index has 8 bytes per entry. So, 8KB offset index has 1K 
entries. Since by default, we add an index entry for every 4KB of messages, 8KB 
offset index corresponds to 1K * 4KB = 4MB of messages.
   
   A time index has 12 bytes per entry. So, the same calculation leads to 2.7MB 
of messages.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #7751: KAFKA-7987: Reinitialize ZookeeperClient after auth failures

2020-11-03 Thread GitBox


rajinisivaram commented on pull request #7751:
URL: https://github.com/apache/kafka/pull/7751#issuecomment-720401081







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vamossagar12 commented on a change in pull request #9539: KAFKA-10634: Adding LeaderId to Voters list in LeaderChangeMessage

2020-11-03 Thread GitBox


vamossagar12 commented on a change in pull request #9539:
URL: https://github.com/apache/kafka/pull/9539#discussion_r516438813



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -317,6 +317,9 @@ private void appendLeaderChangeMessage(LeaderState state, 
long currentTimeMs) {
 .map(follower -> new Voter().setVoterId(follower))
 .collect(Collectors.toList());
 
+// Adding the leader to the voters as the protocol ensures that leader 
always votes for itself.
+voters.add(new Voter().setVoterId(state.election().leaderId()));

Review comment:
   Thanks @hachikuji . So, from the context of this PR, do you suggest to 
continue using the definition of Voters in the LeaderChange message as all the 
voters ? Maybe we can create separate issues for:
   1) changing LeaderChange message to include both all voters and endorsing 
voters for that Leader.
   2) Make relevant changes to be able to pass the grantingVoters information 
from CandidateState to LeaderState. These include the things you mentioned like 
removing `onBecomeLeader` from `initialize()` or throw exception.
   
   Just curious on the last part. As per KAFKA-10527, a node can't be 
initialized as a leader. So, this block of code is effectively unused then:
   
   `if (quorum.isLeader()) {
   onBecomeLeader(currentTimeMs);
   } else if (quorum.isCandidate()) {
   onBecomeCandidate(currentTimeMs);
   } `
   
   
   Let me know how should I proceed here...

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -317,6 +317,9 @@ private void appendLeaderChangeMessage(LeaderState state, 
long currentTimeMs) {
 .map(follower -> new Voter().setVoterId(follower))
 .collect(Collectors.toList());
 
+// Adding the leader to the voters as the protocol ensures that leader 
always votes for itself.
+voters.add(new Voter().setVoterId(state.election().leaderId()));

Review comment:
   Thanks @hachikuji . So, from the context of this PR, do you suggest to 
continue using the definition of Voters in the LeaderChange message as all the 
voters ? Maybe we can create separate issues for:
   1) changing LeaderChange message to include both all voters and endorsing 
voters for that Leader.
   2) Make relevant changes to be able to pass the grantingVoters information 
from CandidateState to LeaderState. These include the things you mentioned like 
removing `onBecomeLeader` from `initialize()` or throw exception.
   
   Just curious on the last part. As per KAFKA-10527, a node can't be 
initialized as a leader. So, this block of code is effectively unused then:
   
   `if (quorum.isLeader()) {
   onBecomeLeader(currentTimeMs);
   } else if (quorum.isCandidate()) {
   onBecomeCandidate(currentTimeMs);
   } `
   
   
   Or should we chase all the above as part of this PR itself? Plz let me know.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #9543: makes the Stream thread list resizable

2020-11-03 Thread GitBox


wcarlson5 commented on pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#issuecomment-720787984


   @cadonna First part to make the streamThreads list resizable



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] anatasiavela commented on a change in pull request #9526: KAFKA-10525: Emit JSONs with new auto-generated schema

2020-11-03 Thread GitBox


anatasiavela commented on a change in pull request #9526:
URL: https://github.com/apache/kafka/pull/9526#discussion_r516288558



##
File path: core/src/main/scala/kafka/network/RequestConvertToJson.scala
##
@@ -0,0 +1,343 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package kafka.network
+
+import com.fasterxml.jackson.databind.JsonNode
+import com.fasterxml.jackson.databind.node.{ArrayNode, DoubleNode, IntNode, 
JsonNodeFactory, LongNode, NullNode, ObjectNode, ShortNode, TextNode}
+import kafka.network.RequestChannel.{Response, Session}
+import org.apache.kafka.common.message._
+import org.apache.kafka.common.protocol.Errors
+import org.apache.kafka.common.record.RecordBatch
+import org.apache.kafka.common.requests._
+import org.apache.kafka.common.utils.CollectionUtils
+
+import scala.jdk.CollectionConverters._
+
+object RequestConvertToJson {
+  def request(request: AbstractRequest, verbose: Boolean): JsonNode = {
+request match {
+  case req: AddOffsetsToTxnRequest => 
AddOffsetsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AddPartitionsToTxnRequest => 
AddPartitionsToTxnRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterClientQuotasRequest => 
AlterClientQuotasRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterConfigsRequest => 
AlterConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterIsrRequest => 
AlterIsrRequestDataJsonConverter.write(req.data(), request.version())
+  case req: AlterPartitionReassignmentsRequest => 
AlterPartitionReassignmentsRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: AlterReplicaLogDirsRequest => 
AlterReplicaLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case res: AlterUserScramCredentialsRequest => 
AlterUserScramCredentialsRequestDataJsonConverter.write(res.data(), 
request.version())
+  case req: ApiVersionsRequest => 
ApiVersionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: BeginQuorumEpochRequest => 
BeginQuorumEpochRequestDataJsonConverter.write(req.data, request.version())
+  case req: ControlledShutdownRequest => 
ControlledShutdownRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateAclsRequest => 
CreateAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateDelegationTokenRequest => 
CreateDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: CreatePartitionsRequest => 
CreatePartitionsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: CreateTopicsRequest => 
CreateTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteAclsRequest => 
DeleteAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteGroupsRequest => 
DeleteGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteRecordsRequest => 
DeleteRecordsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DeleteTopicsRequest => 
DeleteTopicsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeAclsRequest => 
DescribeAclsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeClientQuotasRequest => 
DescribeClientQuotasRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeConfigsRequest => 
DescribeConfigsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeDelegationTokenRequest => 
DescribeDelegationTokenRequestDataJsonConverter.write(req.data(), 
request.version())
+  case req: DescribeGroupsRequest => 
DescribeGroupsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeLogDirsRequest => 
DescribeLogDirsRequestDataJsonConverter.write(req.data(), request.version())
+  case req: DescribeQuorumRequest => 
DescribeQuorumRequestDataJsonConverter.write(req.data, request.version())
+  case res: DescribeUserScramCredentialsRequest => 
DescribeUserScramC

[GitHub] [kafka] abbccdda closed pull request #8677: KAFKA-9999: Make internal topic creation error non-fatal

2020-11-03 Thread GitBox


abbccdda closed pull request #8677:
URL: https://github.com/apache/kafka/pull/8677


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Iskuskov opened a new pull request #9541: KAFKA-10675: Add schema name to ConnectSchema.validateValue() error message

2020-11-03 Thread GitBox


Iskuskov opened a new pull request #9541:
URL: https://github.com/apache/kafka/pull/9541


   The following error message
   ```java
   org.apache.kafka.connect.errors.DataException: Invalid Java object for 
schema type INT64: class java.lang.Long for field: "moderate_time"
   ```
   can be confusing because `java.lang.Long` is acceptable type for schema 
`INT64`. 
   
   In fact, in this case `org.apache.kafka.connect.data.Timestamp` is used but 
this info is not logged.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #9546: MONOR: optimeze PartitionStateMachine handleStateChanges

2020-11-03 Thread GitBox


dengziming commented on pull request #9546:
URL: https://github.com/apache/kafka/pull/9546#issuecomment-720905982


   @chia7712 , Hi, PTLA.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck closed pull request #9542: MINOR: Update trunk docs to match site docs

2020-11-03 Thread GitBox


bbejeck closed pull request #9542:
URL: https://github.com/apache/kafka/pull/9542


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on a change in pull request #9540: KAFKA-10669: Make CurrentLeaderEpoch field ignorable and set MaxNumOffsets field default to 1

2020-11-03 Thread GitBox


omkreddy commented on a change in pull request #9540:
URL: https://github.com/apache/kafka/pull/9540#discussion_r516103337



##
File path: core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
##
@@ -143,7 +143,13 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 val partitionData = response.topics.asScala.find(_.name == topic).get
   .partitions.asScala.find(_.partitionIndex == partition.partition).get
 
-(partitionData.offset, partitionData.leaderEpoch)
+if (version == 0) {
+  if (partitionData.oldStyleOffsets().isEmpty)
+(-1, partitionData.leaderEpoch)
+  else
+(partitionData.oldStyleOffsets().asScala.head, 
partitionData.leaderEpoch)

Review comment:
   `getOrElse` returns Any type (supertype of Long). I kept the current 
code changes to avoid any conversions





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kkonstantine commented on a change in pull request #6820: MINOR: move connectorConfig to AbstractHerder

2020-11-03 Thread GitBox


kkonstantine commented on a change in pull request #6820:
URL: https://github.com/apache/kafka/pull/6820#discussion_r516445120



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -751,14 +751,8 @@ public Void call() throws Exception {
 
 @Override
 public void connectorConfig(String connName, final Callback> callback) {
-// Subset of connectorInfo, so piggy back on that implementation
 log.trace("Submitting connector config read request {}", connName);

Review comment:
   Seems fine to keep this `trace` log in both cases and keep the 
implementation only in `AbstractHerder`. Wdyt?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #9535: MINOR: remove redundant return statement

2020-11-03 Thread GitBox


chia7712 merged pull request #9535:
URL: https://github.com/apache/kafka/pull/9535


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-11-03 Thread GitBox


jsancio commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r516172571



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1017,12 +1028,9 @@ private boolean handleFetchResponse(
 
log.truncateToEndOffset(divergingOffsetAndEpoch).ifPresent(truncationOffset -> {
 logger.info("Truncated to offset {} from Fetch response 
from leader {}",
 truncationOffset, quorum.leaderIdOrNil());
-
-// Since the end offset has been updated, we should 
complete any delayed
-// reads at the end offset.
-fetchPurgatory.maybeComplete(
-new LogOffset(Long.MAX_VALUE, Isolation.UNCOMMITTED),
-currentTimeMs);
+// After truncation, we complete all pending reads in 
order to
+// ensure that fetches account for the updated log end 
offset
+fetchPurgatory.completeAll(currentTimeMs);

Review comment:
   > I had considered this previously and decided to leave the fetches in 
purgatory while the election was in progress to prevent unnecessary retries 
since that is all the client can do while waiting for the outcome. On the other 
hand, some of the fetches in purgatory might be from other voters. It might be 
better to respond more quickly so that there are not any unnecessary election 
delays. I'd suggest we open a separate issue to consider this.
   
   Sounds good to create a Jira for this.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1778,4 +1808,98 @@ public void complete() {
 }
 }
 
+private final class ListenerContext implements 
CloseListener> {
+private final RaftClient.Listener listener;
+private BatchReader lastSent = null;
+private long lastAckedOffset = 0;
+private int claimedEpoch = 0;
+
+private ListenerContext(Listener listener) {
+this.listener = listener;
+}
+
+/**
+ * Get the last acked offset, which is one greater than the offset of 
the
+ * last record which was acked by the state machine.
+ */
+public synchronized long lastAckedOffset() {
+return lastAckedOffset;
+}
+
+/**
+ * Get the next expected offset, which might be larger than the last 
acked
+ * offset if there are inflight batches which have not been acked yet.
+ * Note that when fetching from disk, we may not know the last offset 
of
+ * inflight data until it has been processed by the state machine. In 
this case,
+ * we delay sending additional data until the state machine has read 
to the
+ * end and the last offset is determined.
+ */
+public synchronized OptionalLong nextExpectedOffset() {
+if (lastSent != null) {
+OptionalLong lastSentOffset = lastSent.lastOffset();
+if (lastSentOffset.isPresent()) {
+return OptionalLong.of(lastSentOffset.getAsLong() + 1);
+} else {
+return OptionalLong.empty();
+}
+} else {
+return OptionalLong.of(lastAckedOffset);
+}
+}
+
+/**
+ * This API is used for committed records that have been received 
through
+ * replication. In general, followers will write new data to disk 
before they
+ * know whether it has been committed. Rather than retaining the 
uncommitted
+ * data in memory, we let the state machine read the records from disk.
+ */
+public void fireHandleCommit(long baseOffset, Records records) {
+BufferSupplier bufferSupplier = BufferSupplier.create();
+RecordsBatchReader reader = new 
RecordsBatchReader<>(baseOffset, records,
+serde, bufferSupplier, this);
+fireHandleCommit(reader);
+}
+
+/**
+ * This API is used for committed records originating from {@link 
#scheduleAppend(int, List)}
+ * on this instance. In this case, we are able to save the original 
record objects,
+ * which saves the need to read them back from disk. This is a nice 
optimization
+ * for the leader which is typically doing more work than all of the 
followers.
+ */
+public void fireHandleCommit(long baseOffset, int epoch, List 
records) {
+BatchReader.Batch batch = new BatchReader.Batch<>(baseOffset, 
epoch, records);
+MemoryBatchReader reader = new 
MemoryBatchReader<>(Collections.singletonList(batch), this);
+fireHandleCommit(reader);
+}
+
+private void fireHandleCommit(BatchReader reader) {
+synchronized (this) {
+this.lastSent = reader;
+ 

[GitHub] [kafka] chia7712 commented on a change in pull request #7641: MINOR: Add a unit test for GroupMetadataManager#offsetExpiredSensor

2020-11-03 Thread GitBox


chia7712 commented on a change in pull request #7641:
URL: https://github.com/apache/kafka/pull/7641#discussion_r516399560



##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -791,52 +797,52 @@ class GroupMetadataManager(brokerId: Int,
 (removedOffsets, group.is(Dead), group.generationId)
   }
 
-val offsetsPartition = partitionFor(groupId)
-val appendPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
offsetsPartition)
-getMagic(offsetsPartition) match {
-  case Some(magicValue) =>
-// We always use CREATE_TIME, like the producer. The conversion to 
LOG_APPEND_TIME (if necessary) happens automatically.
-val timestampType = TimestampType.CREATE_TIME
-val timestamp = time.milliseconds()
+  val offsetsPartition = partitionFor(groupId)

Review comment:
   make senses to me. Could you fix the conflicting files?

##
File path: 
core/src/main/scala/kafka/coordinator/group/GroupMetadataManager.scala
##
@@ -791,52 +797,52 @@ class GroupMetadataManager(brokerId: Int,
 (removedOffsets, group.is(Dead), group.generationId)
   }
 
-val offsetsPartition = partitionFor(groupId)
-val appendPartition = new TopicPartition(Topic.GROUP_METADATA_TOPIC_NAME, 
offsetsPartition)
-getMagic(offsetsPartition) match {
-  case Some(magicValue) =>
-// We always use CREATE_TIME, like the producer. The conversion to 
LOG_APPEND_TIME (if necessary) happens automatically.
-val timestampType = TimestampType.CREATE_TIME
-val timestamp = time.milliseconds()
+  val offsetsPartition = partitionFor(groupId)

Review comment:
   make sense to me. Could you fix the conflicting files?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

2020-11-03 Thread GitBox


dengziming commented on a change in pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#discussion_r516385415



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1560,9 +1558,34 @@ private long pollLeader(long currentTimeMs) {
 return Math.min(timeUntilFlush, timeUntilSend);
 }
 
+private long maybeSendVoteRequests(
+CandidateState state,
+long currentTimeMs
+) {
+// Continue sending Vote requests as long as we still have a chance to 
win the election
+if (!state.isVoteRejected()) {
+return maybeSendRequests(
+currentTimeMs,
+state.unrecordedVoters(),
+this::buildVoteRequest
+);
+}
+return Long.MAX_VALUE;
+}
+
 private long pollCandidate(long currentTimeMs) throws IOException {
 CandidateState state = quorum.candidateStateOrThrow();
-if (state.isBackingOff()) {
+GracefulShutdown shutdown = this.shutdown.get();
+
+if (shutdown != null) {
+// If we happen to shutdown while we are a candidate, we will 
continue
+// with the current election until one of the following conditions 
is met:
+//  1) we are elected as leader (which allows us to resign)
+//  2) another leader is elected
+//  3) the shutdown timer expires
+long minRequestBackoffMs = maybeSendVoteRequests(state, 
currentTimeMs);
+return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);

Review comment:
   Understand, The candidate will try to complete only the current election 
when shutting down, so just ignore the election timeout.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] olafurpg commented on a change in pull request #7452: KAFKA-8991: Enable scalac optimizer

2020-11-03 Thread GitBox


olafurpg commented on a change in pull request #7452:
URL: https://github.com/apache/kafka/pull/7452#discussion_r516618369



##
File path: build.gradle
##
@@ -419,6 +419,22 @@ subprojects {
 "-Xlint:constant",
 "-Xlint:unused"
   ]
+
+  // Inline more aggressively when compiling the `core` jar since it's not 
meant to be used as a library.
+  // More specifically, inline classes from the Scala library so that we 
can inline methods like `Option.exists`
+  // and avoid lambda allocations. This is only safe if the Scala library 
version is the same at compile time
+  // and runtime. We cannot guarantee this for libraries like kafka 
streams, so only inline classes from the
+  // Kafka project in that case.
+  List inlineFrom
+  if (project.name.equals('core'))
+inlineFrom = ["-opt-inline-from:scala.**", 
"-opt-inline-from:kafka.**", "-opt-inline-from:org.apache.kafka.**"]

Review comment:
   @ijuma I apologize if this the wrong place to ask a question about this 
PR. What would be the best place to start a discussion on removing 
`"-opt-inline-from:scala.**"` or reducing the scope of the glob to only include 
relevant methods like `Option.exists`? I'm not so familiar with the Kafka 
development process.
   
   At Twitter, we are currently unable to upgrade to 2.5.x because 
kafka_2.12:2.5.1 inlines anonymous classes from the 2.12.11 library that no 
longer exist in the 2.12.12 library, resulting in crashes like this
   ```
java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$7
   ```
   We can work around this issue by publishing a custom build of Kafka but 
that's not a sustainable solution.
   
   These inlining settings are causing issues for a lot of users based on my 
search online. Even members of the Scala compiler team who worked on the 
inliner have suggested that the inliner settings in the Kafka build are based 
on false assumptions 
https://github.com/akka/alpakka-kafka/pull/1212#issuecomment-706933445 cc/ 
@lrytz 
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on a change in pull request #9536: MINOR: KIP-584: Remove admin client facility to read features from controller

2020-11-03 Thread GitBox


chia7712 commented on a change in pull request #9536:
URL: https://github.com/apache/kafka/pull/9536#discussion_r515749438



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4353,11 +4353,8 @@ void handleFailure(Throwable throwable) {
 public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {

Review comment:
   how about adding ```@Override```?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck opened a new pull request #9544: MINOR: Add back section taken out by mistake

2020-11-03 Thread GitBox


bbejeck opened a new pull request #9544:
URL: https://github.com/apache/kafka/pull/9544


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9527: MINOR: avoid unnecessary conversion and tuple when updating error met…

2020-11-03 Thread GitBox


chia7712 commented on pull request #9527:
URL: https://github.com/apache/kafka/pull/9527#issuecomment-721002109


   the failed tests are unrelated to this PR. rebase to trigger QA



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scanterog commented on pull request #9545: [mm2] Allow Checkpoints for consumers using static partition assignments

2020-11-03 Thread GitBox


scanterog commented on pull request #9545:
URL: https://github.com/apache/kafka/pull/9545#issuecomment-720844769


   cc: @ryannedolan who added this filter and might have more context on why 
this was needed.
   cc: @mimaison for more eyes on this :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on a change in pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers

2020-11-03 Thread GitBox


mjsax commented on a change in pull request #9000:
URL: https://github.com/apache/kafka/pull/9000#discussion_r516239935



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
##
@@ -33,12 +38,17 @@
  * @see TransformerSupplier
  * @see KStream#transform(TransformerSupplier, String...)
  */
-public interface ValueTransformerSupplier extends 
ConnectedStoreProvider {
+public interface ValueTransformerSupplier extends 
ConnectedStoreProvider, Supplier> {

Review comment:
   This seems to be a public API change that we cannot do without a KIP. 
Seem you added it so you can pass the different suppliers into `checkSupplier` 
? Also not sure if `checkSupplier` must be as "complicated" as proposed.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
##
@@ -75,4 +81,31 @@ public static long validateMillisecondInstant(final Instant 
instant, final Strin
 public static String prepareMillisCheckFailMsgPrefix(final Object value, 
final String name) {
 return format(MILLISECOND_VALIDATION_FAIL_MSG_FRMT, name, value);
 }
+
+/**
+ * @throws IllegalArgumentException if the same instance is obtained each 
time
+ */
+public static void checkSupplier(final Supplier supplier) {
+if (supplier.get() == supplier.get()) {
+final String supplierClass = 
getAllImplementedInterfaces(supplier.getClass()).stream()

Review comment:
   Do we really need to try to extract the concrete interface name?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #6820: MINOR: move connectorConfig to AbstractHerder

2020-11-03 Thread GitBox


chia7712 commented on pull request #6820:
URL: https://github.com/apache/kafka/pull/6820#issuecomment-720874808


   @ivanyu nice catching. Could you fix those conflicting files? I'd like to 
commit it :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #6647: MINOR: Fix an example in the Kafka Streams tutorial to be compilable

2020-11-03 Thread GitBox


chia7712 merged pull request #6647:
URL: https://github.com/apache/kafka/pull/6647


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman edited a comment on pull request #9515: KAFKA-10651: read offsets directly from checkpoint for uninitialized tasks

2020-11-03 Thread GitBox


ableegoldman edited a comment on pull request #9515:
URL: https://github.com/apache/kafka/pull/9515#issuecomment-719848567


   Cherry-picked to 2.7 (cc/ @bbejeck) and 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck opened a new pull request #9542: MINOR: Update trunk docs to match site docs

2020-11-03 Thread GitBox


bbejeck opened a new pull request #9542:
URL: https://github.com/apache/kafka/pull/9542


   Migration of changes for AK site upgrade done on site but not migrated to AK 
trunk
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 closed pull request #5490: MINOR: Fix typo in TaskManager

2020-11-03 Thread GitBox


chia7712 closed pull request #5490:
URL: https://github.com/apache/kafka/pull/5490


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #7266: MINOR: Fix capitalization inconsistency in ConfigCommand.scala

2020-11-03 Thread GitBox


chia7712 commented on pull request #7266:
URL: https://github.com/apache/kafka/pull/7266#issuecomment-720873421


   @KevinLiLu Thanks for your patch. I'm going to close this PR since this 
issue was fixed by 
https://github.com/apache/kafka/commit/7f35a6713434dd7f2ccd3897aef825d34582beab.
 Feel free to ping me to review your other pending PRs in the future.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji merged pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-11-03 Thread GitBox


hachikuji merged pull request #9482:
URL: https://github.com/apache/kafka/pull/9482


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 opened a new pull request #9543: makes the Stream thread list resizable

2020-11-03 Thread GitBox


wcarlson5 opened a new pull request #9543:
URL: https://github.com/apache/kafka/pull/9543


   *More detailed description of your change,
   if necessary. The PR title and PR message become
   the squashed commit message, so use a separate
   comment to ping reviewers.*
   
   *Summary of testing strategy (including rationale)
   for the feature or bug fix. Unit and/or integration
   tests are expected for any behaviour change and
   system tests should be considered for larger changes.*
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #7452: KAFKA-8991: Enable scalac optimizer

2020-11-03 Thread GitBox


ijuma commented on a change in pull request #7452:
URL: https://github.com/apache/kafka/pull/7452#discussion_r516664863



##
File path: build.gradle
##
@@ -419,6 +419,22 @@ subprojects {
 "-Xlint:constant",
 "-Xlint:unused"
   ]
+
+  // Inline more aggressively when compiling the `core` jar since it's not 
meant to be used as a library.
+  // More specifically, inline classes from the Scala library so that we 
can inline methods like `Option.exists`
+  // and avoid lambda allocations. This is only safe if the Scala library 
version is the same at compile time
+  // and runtime. We cannot guarantee this for libraries like kafka 
streams, so only inline classes from the
+  // Kafka project in that case.
+  List inlineFrom
+  if (project.name.equals('core'))
+inlineFrom = ["-opt-inline-from:scala.**", 
"-opt-inline-from:kafka.**", "-opt-inline-from:org.apache.kafka.**"]

Review comment:
   You can submit a PR. :)





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #9536: MINOR: KIP-584: Remove admin client facility to read features from controller

2020-11-03 Thread GitBox


kowshik commented on a change in pull request #9536:
URL: https://github.com/apache/kafka/pull/9536#discussion_r515769357



##
File path: 
clients/src/main/java/org/apache/kafka/clients/admin/KafkaAdminClient.java
##
@@ -4353,11 +4353,8 @@ void handleFailure(Throwable throwable) {
 public DescribeFeaturesResult describeFeatures(final 
DescribeFeaturesOptions options) {

Review comment:
   Done.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck edited a comment on pull request #9468: KAFKA-10454 / (2.6) Update copartitionSourceGroups when optimization algorithm is triggered

2020-11-03 Thread GitBox


bbejeck edited a comment on pull request #9468:
URL: https://github.com/apache/kafka/pull/9468#issuecomment-720473323


   > What's the status of this PR? can we get it merged this week?
   
   @mimaison yes, I've been waiting on the Jenkins build which I believe is 
fixed now.  Once I can get the build ran, I'll merge it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 merged pull request #9180: MINOR: corrected unit tests in AbstractConfigTest.java- fixed invalid assertions

2020-11-03 Thread GitBox


chia7712 merged pull request #9180:
URL: https://github.com/apache/kafka/pull/9180


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck merged pull request #9468: KAFKA-10454 / (2.6) Update copartitionSourceGroups when optimization algorithm is triggered

2020-11-03 Thread GitBox


bbejeck merged pull request #9468:
URL: https://github.com/apache/kafka/pull/9468


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scanterog edited a comment on pull request #9545: [mm2] Allow Checkpoints for consumers using static partition assignments

2020-11-03 Thread GitBox


scanterog edited a comment on pull request #9545:
URL: https://github.com/apache/kafka/pull/9545#issuecomment-720844769


   cc: @ryannedolan who added this filter and might have more context on why 
this was needed.
   cc: @mimaison for more eyes on this :)
   
   Update: tests are working fine locally. Might want to rerun it here.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] RamanVerma commented on pull request #9364: KAFKA-10471 Mark broker crash during log loading as unclean shutdown

2020-11-03 Thread GitBox


RamanVerma commented on pull request #9364:
URL: https://github.com/apache/kafka/pull/9364#issuecomment-720697940


   @junrao I don't think the JDK 15 failure is related to my change.
   - We see that tests pass till JDK 11
   - I have run the test in my IDE with AdoptOpenJDK15 and it passed
   - The test fails because the reassignment has not completed within timeout 
window. As part of this change, I have not changed the way cleanShutdownFile is 
created, or anything related to the reassignment of replicas. We just changed 
when this file is to be deleted.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 closed pull request #7266: MINOR: Fix capitalization inconsistency in ConfigCommand.scala

2020-11-03 Thread GitBox


chia7712 closed pull request #7266:
URL: https://github.com/apache/kafka/pull/7266


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #7751: KAFKA-7987: Reinitialize ZookeeperClient after auth failures

2020-11-03 Thread GitBox


rajinisivaram commented on a change in pull request #7751:
URL: https://github.com/apache/kafka/pull/7751#discussion_r515933228



##
File path: core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
##
@@ -437,8 +437,14 @@ class ZooKeeperClient(connectString: String,
   if (state == KeeperState.AuthFailed) {
 error("Auth failed.")
 stateChangeHandlers.values.foreach(_.onAuthFailure())
+
+// If this is during initial startup, the reinitialization 
scheduler hasn't been started yet.
+// To support failing fast, allow authorization to fail in that 
case.
+if (reinitializeScheduler.isStarted) {

Review comment:
   Fixed.

##
File path: core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
##
@@ -81,7 +85,8 @@ class ZooKeeperClient(connectString: String,
   private val zNodeChildChangeHandlers = new ConcurrentHashMap[String, 
ZNodeChildChangeHandler]().asScala
   private val inFlightRequests = new Semaphore(maxInFlightRequests)
   private val stateChangeHandlers = new ConcurrentHashMap[String, 
StateChangeHandler]().asScala
-  private[zookeeper] val expiryScheduler = new KafkaScheduler(threads = 1, 
"zk-session-expiry-handler")
+  private[zookeeper] val reinitializeScheduler = new KafkaScheduler(threads = 
1, "zk-client-reinit-")

Review comment:
   Done.

##
File path: core/src/main/scala/kafka/zookeeper/ZooKeeperClient.scala
##
@@ -39,6 +39,10 @@ import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 import scala.collection.mutable.Set
 
+object ZooKeeperClient {
+  val AuthFailedRetryBackoffMs = 100

Review comment:
   Done.

##
File path: 
core/src/test/scala/unit/kafka/security/authorizer/AclAuthorizerWithZkSaslTest.scala
##
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka.security.authorizer
+
+import java.net.InetAddress
+import java.util
+import java.util.UUID
+import java.util.concurrent.{Executors, TimeUnit}
+
+import javax.security.auth.Subject
+import javax.security.auth.callback.CallbackHandler
+import kafka.api.SaslSetup
+import kafka.security.authorizer.AclEntry.WildcardHost
+import kafka.server.KafkaConfig
+import kafka.utils.JaasTestUtils.{JaasModule, JaasSection}
+import kafka.utils.{JaasTestUtils, TestUtils}
+import kafka.zk.{KafkaZkClient, ZooKeeperTestHarness}
+import kafka.zookeeper.ZooKeeperClient
+import org.apache.kafka.common.acl.{AccessControlEntry, 
AccessControlEntryFilter, AclBinding, AclBindingFilter}
+import org.apache.kafka.common.acl.AclOperation.{READ, WRITE}
+import org.apache.kafka.common.acl.AclPermissionType.ALLOW
+import org.apache.kafka.common.network.{ClientInformation, ListenerName}
+import org.apache.kafka.common.protocol.ApiKeys
+import org.apache.kafka.common.requests.{RequestContext, RequestHeader}
+import org.apache.kafka.common.resource.PatternType.LITERAL
+import org.apache.kafka.common.resource.ResourcePattern
+import org.apache.kafka.common.resource.ResourceType.TOPIC
+import org.apache.kafka.common.security.auth.{KafkaPrincipal, SecurityProtocol}
+import org.apache.kafka.test.{TestUtils => JTestUtils}
+import org.apache.zookeeper.server.auth.DigestLoginModule
+import org.junit.Assert.assertEquals
+import org.junit.{After, Before, Test}
+
+import scala.jdk.CollectionConverters._
+import scala.collection.Seq
+
+class AclAuthorizerWithZkSaslTest extends ZooKeeperTestHarness with SaslSetup {
+
+  private val aclAuthorizer = new AclAuthorizer
+  private val aclAuthorizer2 = new AclAuthorizer
+  private val resource: ResourcePattern = new ResourcePattern(TOPIC, "foo-" + 
UUID.randomUUID(), LITERAL)
+  private val username = "alice"
+  private val principal = new KafkaPrincipal(KafkaPrincipal.USER_TYPE, 
username)
+  private val requestContext = newRequestContext(principal, 
InetAddress.getByName("192.168.0.1"))
+  private val executor = Executors.newSingleThreadScheduledExecutor
+  private var config: KafkaConfig = _
+
+  @Before
+  override def setUp(): Unit = {
+// Allow failed clients to avoid server closing the connection before 
reporting AuthFailed.
+System.setProperty("zookeeper.allowSaslFailedClients

[GitHub] [kafka] chia7712 closed pull request #4817: MINOR: Fix Findbugs issue about useless object in KafkaAdminClient

2020-11-03 Thread GitBox


chia7712 closed pull request #4817:
URL: https://github.com/apache/kafka/pull/4817


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] splett2 commented on a change in pull request #9386: KAFKA-10024: Add dynamic configuration and enforce quota for per-IP connection rate limits (KIP-612, part 2)

2020-11-03 Thread GitBox


splett2 commented on a change in pull request #9386:
URL: https://github.com/apache/kafka/pull/9386#discussion_r516301423



##
File path: 
core/src/test/scala/integration/kafka/network/DynamicConnectionQuotaTest.scala
##
@@ -240,6 +256,16 @@ class DynamicConnectionQuotaTest extends BaseRequestTest {
   s"Admin client connection not closed (initial = $initialConnectionCount, 
current = $connectionCount)")
   }
 
+  private def updateIpConnectionRate(ip: Option[String], updatedRate: Int): 
Unit = {
+adminZkClient.changeIpConfig(ip.getOrElse(ConfigEntityName.Default),
+  CoreUtils.propsWith(DynamicConfig.Ip.IpConnectionRateOverrideProp, 
updatedRate.toString))
+// use a random throwaway address if ip isn't specified to get the default 
value
+TestUtils.waitUntilTrue(() => servers.head.socketServer.connectionQuotas.
+  connectionRateForIp(InetAddress.getByName(ip.getOrElse("255.255.3.4"))) 
== updatedRate,

Review comment:
   This is admittedly a little weird.
   If `None` is given as the IP, we want to update the default connection rate. 
   To verify that the default connection rate was updated, we need to call 
`connectionRateForIp` with some arbitrary IP address that hasn't been given a 
specific override. In this case, I used an arbitrary IP, `255.255.3.4` as 
mentioned in the comment.

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -1246,7 +1337,57 @@ class ConnectionQuotas(config: KafkaConfig, time: Time, 
metrics: Metrics) extend
   private[network] def updateBrokerMaxConnectionRate(maxConnectionRate: Int): 
Unit = {
 // if there is a connection waiting on the rate throttle delay, we will 
let it wait the original delay even if
 // the rate limit increases, because it is just one connection per 
listener and the code is simpler that way
-updateConnectionRateQuota(maxConnectionRate)
+updateConnectionRateQuota(maxConnectionRate, BrokerQuotaEntity)
+  }
+
+  /**
+   * Update the connection rate quota for a given IP and updates quota configs 
for updated IPs.
+   * If an IP is given, metric config will be updated only for the given IP, 
otherwise
+   * all metric configs will be checked and updated if required
+   *
+   * @param ip ip to update or default if None
+   * @param maxConnectionRate new connection rate, or resets entity to default 
if None
+   */
+  def updateIpConnectionRate(ip: Option[String], maxConnectionRate: 
Option[Int]): Unit = {
+def isIpConnectionRateMetric(metricName: MetricName) = {
+  metricName.name == ConnectionRateMetricName &&
+  metricName.group == MetricsGroup &&
+  metricName.tags.containsKey(IpMetricTag)
+}
+
+def shouldUpdateQuota(metric: KafkaMetric, quotaLimit: Int) = {
+  quotaLimit != metric.config.quota.bound
+}
+
+ip match {
+  case Some(addr) =>
+val address = InetAddress.getByName(addr)
+maxConnectionRate match {
+  case Some(rate) =>
+info(s"Updating max connection rate override for $address to 
$rate")
+connectionRatePerIp.put(address, rate)
+  case None =>
+info(s"Removing max connection rate override for $address")
+connectionRatePerIp.remove(address)
+}
+updateConnectionRateQuota(connectionRateForIp(address), 
IpQuotaEntity(address))
+  case None =>
+val newQuota = 
maxConnectionRate.getOrElse(DynamicConfig.Ip.DefaultConnectionCreationRate)
+info(s"Updating default max IP connection rate to $newQuota")
+defaultConnectionRatePerIp = newQuota
+val allMetrics = metrics.metrics
+allMetrics.forEach { (metricName, metric) =>
+  if (isIpConnectionRateMetric(metricName) && 
shouldUpdateQuota(metric, newQuota)) {
+info(s"Updating existing connection rate sensor for 
${metricName.tags} to $newQuota")
+metric.config(rateQuotaMetricConfig(newQuota))
+  }

Review comment:
   @dajac 
   good catch, we shouldn't be using newQuota here. I agree, this should have 
been covered by testing, I'll work on adding some more unit tests for the 
metric config updating.
   
   @apovzner 
   It should handle that case, yeah.
   If default is set to some value, when we remove quota for an ip, e.g. 
`updateConnectionRate(Some(ip), None)`, we remove the connection rate entry 
from the map and then call `getOrDefault(ip, defaultConnectionRatePerIp)`which 
should be whatever the non-unlimited per-IP default quota is.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ivanyu commented on pull request #6820: MINOR: move connectorConfig to AbstractHerder

2020-11-03 Thread GitBox


ivanyu commented on pull request #6820:
URL: https://github.com/apache/kafka/pull/6820#issuecomment-720930041







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ivanyu edited a comment on pull request #6820: MINOR: move connectorConfig to AbstractHerder

2020-11-03 Thread GitBox


ivanyu edited a comment on pull request #6820:
URL: https://github.com/apache/kafka/pull/6820#issuecomment-720997301


   retest this please
   
   UPD: ah, I guess it doesn't work anymore...



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9501: MINOR: move the test cases which don't need brokers from TopicCommand…

2020-11-03 Thread GitBox


chia7712 commented on pull request #9501:
URL: https://github.com/apache/kafka/pull/9501#issuecomment-720870283


   @dajac Could you take a look? 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #9180: MINOR: corrected unit tests in AbstractConfigTest.java- fixed invalid assertions

2020-11-03 Thread GitBox


chia7712 commented on pull request #9180:
URL: https://github.com/apache/kafka/pull/9180#issuecomment-720937214


   @sanketfajage Thanks for your patch. Merge it to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman edited a comment on pull request #9534: KAFKA-10664: Delete existing checkpoint when writing empty offsets

2020-11-03 Thread GitBox


ableegoldman edited a comment on pull request #9534:
URL: https://github.com/apache/kafka/pull/9534#issuecomment-719848646


   Cherry-picked to 2.7 (cc/ @bbejeck) and 2.6



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9103: KAFKA-10181: Use Envelope RPC to do redirection for (Incremental)AlterConfig, AlterClientQuota and CreateTopics

2020-11-03 Thread GitBox


hachikuji commented on a change in pull request #9103:
URL: https://github.com/apache/kafka/pull/9103#discussion_r516363883



##
File path: 
core/src/test/scala/integration/kafka/server/DynamicBrokerReconfigurationTest.scala
##
@@ -70,12 +74,8 @@ import scala.collection.mutable.ArrayBuffer
 import scala.jdk.CollectionConverters._
 import scala.collection.Seq
 
-object DynamicBrokerReconfigurationTest {
-  val SecureInternal = "INTERNAL"
-  val SecureExternal = "EXTERNAL"
-}
-
-class DynamicBrokerReconfigurationTest extends ZooKeeperTestHarness with 
SaslSetup {
+@RunWith(value = classOf[Parameterized])
+class DynamicBrokerReconfigurationTest(quorumBasedController: JBoolean) 
extends ZooKeeperTestHarness with SaslSetup {

Review comment:
   It is quite expensive to parameterize these test cases. I am not sure it 
is worthwhile. If forwarding works for one of these cases, why would the others 
be different? Since we are not planning to enable this feature yet, I think 
unit tests in `KafkaApisTest` and maybe one integration test are good enough.

##
File path: core/src/main/scala/kafka/server/KafkaConfig.scala
##
@@ -352,6 +352,8 @@ object KafkaConfig {
   val RequestTimeoutMsProp = CommonClientConfigs.REQUEST_TIMEOUT_MS_CONFIG
   val ConnectionSetupTimeoutMsProp = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MS_CONFIG
   val ConnectionSetupTimeoutMaxMsProp = 
CommonClientConfigs.SOCKET_CONNECTION_SETUP_TIMEOUT_MAX_MS_CONFIG
+  private[server] val enableMetadataQuorumProp = "enable.metadata.quorum"

Review comment:
   nit: every other property name uses a capital first letter

##
File path: 
clients/src/main/java/org/apache/kafka/common/security/auth/KafkaPrincipalSerde.java
##
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.common.security.auth;
+
+import org.apache.kafka.common.errors.SerializationException;
+
+import java.nio.ByteBuffer;
+
+/**
+ * Serializer/Deserializer interface for {@link KafkaPrincipal} for the the 
purpose of inter-broker forwarding.
+ * Any serialization/deserialization failure should raise a {@link 
SerializationException} to be consistent.
+ */
+public interface KafkaPrincipalSerde {
+
+ByteBuffer serialize(KafkaPrincipal principal);

Review comment:
   Can you add a javadoc for these methods and mention `@throws 
SerializationException`?

##
File path: core/src/main/scala/kafka/network/SocketServer.scala
##
@@ -993,6 +1013,34 @@ private[kafka] class Processor(val id: Int,
 selector.clearCompletedReceives()
   }
 
+  private def parseEnvelopeRequest(receive: NetworkReceive,
+   nowNanos: Long,
+   connectionId: String,
+   context: RequestContext,
+   principalSerde: 
Option[KafkaPrincipalSerde]) = {
+val envelopeRequest = 
context.parseRequest(receive.payload).request.asInstanceOf[EnvelopeRequest]
+
+val originalHeader = RequestHeader.parse(envelopeRequest.requestData)
+// Leave the principal null here is ok since we will fail the request 
during Kafka API handling.
+val originalPrincipal = if (principalSerde.isDefined)
+  principalSerde.get.deserialize(envelopeRequest.principalData)
+else
+  null
+
+val originalClientAddress = 
InetAddress.getByAddress(envelopeRequest.clientAddress)
+val originalContext = new RequestContext(originalHeader, connectionId,
+  originalClientAddress, originalPrincipal, listenerName,
+  securityProtocol, context.clientInformation, isPrivilegedListener)
+
+val envelopeContext = new EnvelopeContext(
+  brokerContext = context,
+  receive.payload)
+
+new network.RequestChannel.Request(processor = id, context = 
originalContext,

Review comment:
   nit: `network` prefix is not needed since we are already in this package

##
File path: 
clients/src/main/java/org/apache/kafka/common/errors/PrincipalDeserializationFailureException.java
##
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. S

[GitHub] [kafka] chia7712 commented on pull request #9546: MONOR: optimeze PartitionStateMachine handleStateChanges

2020-11-03 Thread GitBox


chia7712 commented on pull request #9546:
URL: https://github.com/apache/kafka/pull/9546#issuecomment-720937420


   @dengziming Could you rebase PR to trigger QA again?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on a change in pull request #9540: KAFKA-10669: Make CurrentLeaderEpoch field ignorable and set MaxNumOffsets field default to 1

2020-11-03 Thread GitBox


dajac commented on a change in pull request #9540:
URL: https://github.com/apache/kafka/pull/9540#discussion_r515806990



##
File path: 
clients/src/test/java/org/apache/kafka/common/requests/RequestResponseTest.java
##
@@ -1261,10 +1261,9 @@ private ListOffsetRequest createListOffsetRequest(int 
version) {
 } else if (version >= 2 && version <= 5) {
 ListOffsetPartition partition = new ListOffsetPartition()
 .setPartitionIndex(0)
-.setTimestamp(100L);
-if (version >= 4) {
-partition.setCurrentLeaderEpoch(5);
-}
+.setTimestamp(100L)
+.setCurrentLeaderEpoch(5);

Review comment:
   I suggest to set current leader epoch for all the versions as we have 
made it ignorable. We set it all the time in the replica fetcher so the test 
would be aligned with what we do.

##
File path: core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
##
@@ -180,7 +183,8 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 
 TestUtils.generateAndProduceMessages(servers, topic, 10)
 
-assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 0))
+assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetRequest.EARLIEST_TIMESTAMP, 0))
+assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetRequest.LATEST_TIMESTAMP, 0))

Review comment:
   Would it make sense to test `EARLIEST_TIMESTAMP`, `LATEST_TIMESTAMP`, 
and `0L` cases for all the versions?

##
File path: core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
##
@@ -143,7 +143,13 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 val partitionData = response.topics.asScala.find(_.name == topic).get
   .partitions.asScala.find(_.partitionIndex == partition.partition).get
 
-(partitionData.offset, partitionData.leaderEpoch)
+if (version == 0) {
+  if (partitionData.oldStyleOffsets().isEmpty)
+(-1, partitionData.leaderEpoch)
+  else
+(partitionData.oldStyleOffsets().asScala.head, 
partitionData.leaderEpoch)

Review comment:
   nit: We could use `headOption.getOrElse(-1)` and avoid the extra 
`if/else`.

##
File path: core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
##
@@ -180,11 +186,21 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 
 TestUtils.generateAndProduceMessages(servers, topic, 10)
 
-assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 0))
-assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 1))
-assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 2))
-assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 3))
-assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, 4))
+for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to 
ApiKeys.LIST_OFFSETS.latestVersion) {

Review comment:
   nit: Now that we test all versions, could we rename to test to 
`testResponseDefaultOffsetAndLeaderEpochForAllVersions` in order to stay 
consistent?

##
File path: core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
##
@@ -180,11 +186,21 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 
 TestUtils.generateAndProduceMessages(servers, topic, 10)
 
-assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 0))
-assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 1))
-assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 2))
-assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 3))
-assertEquals((0L, 0), fetchOffsetAndEpoch(firstLeaderId, 0L, 4))
+for (version <- ApiKeys.LIST_OFFSETS.oldestVersion to 
ApiKeys.LIST_OFFSETS.latestVersion) {
+  if (version == 0) {
+assertEquals((-1L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 
version.toShort))
+assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort))
+assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetRequest.LATEST_TIMESTAMP, version.toShort))
+  } else if (version >= 1 && version <=3) {
+assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 0L, 
version.toShort))
+assertEquals((0L, -1), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetRequest.EARLIEST_TIMESTAMP, version.toShort))
+assertEquals((10L, -1), fetchOffsetAndEpoch(firstLeaderId, 
ListOffsetRequest.LATEST_TIMESTAMP, version.toShort))
+  } else if (version >=4) {

Review comment:
   nit: Missing space before `4`.

##
File path: core/src/test/scala/unit/kafka/server/ListOffsetsRequestTest.scala
##
@@ -180,11 +186,21 @@ class ListOffsetsRequestTest extends BaseRequestTest {
 
 TestUtils.generateAndProduceMessages(servers, topic, 10)
 
-assertEquals((-1L, -1), fetchOffsetAndEpoch(

[GitHub] [kafka] ivanyu commented on a change in pull request #6820: MINOR: move connectorConfig to AbstractHerder

2020-11-03 Thread GitBox


ivanyu commented on a change in pull request #6820:
URL: https://github.com/apache/kafka/pull/6820#discussion_r516447945



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/distributed/DistributedHerder.java
##
@@ -751,14 +751,8 @@ public Void call() throws Exception {
 
 @Override
 public void connectorConfig(String connName, final Callback> callback) {
-// Subset of connectorInfo, so piggy back on that implementation
 log.trace("Submitting connector config read request {}", connName);

Review comment:
   The trace logging for other operations (`putConnectorConfig`, 
`taskConfigs`, etc) is done only in `DistributedHerder`, too. For the 
consistency sake, I'd leave it as is in this PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy closed pull request #9540: KAFKA-10669: Make CurrentLeaderEpoch field ignorable and set MaxNumOffsets field default to 1

2020-11-03 Thread GitBox


omkreddy closed pull request #9540:
URL: https://github.com/apache/kafka/pull/9540


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9531: KAFKA-10661; Add new resigned state for graceful shutdown/initialization

2020-11-03 Thread GitBox


hachikuji commented on a change in pull request #9531:
URL: https://github.com/apache/kafka/pull/9531#discussion_r516198568



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1560,9 +1558,34 @@ private long pollLeader(long currentTimeMs) {
 return Math.min(timeUntilFlush, timeUntilSend);
 }
 
+private long maybeSendVoteRequests(
+CandidateState state,
+long currentTimeMs
+) {
+// Continue sending Vote requests as long as we still have a chance to 
win the election
+if (!state.isVoteRejected()) {
+return maybeSendRequests(
+currentTimeMs,
+state.unrecordedVoters(),
+this::buildVoteRequest
+);
+}
+return Long.MAX_VALUE;
+}
+
 private long pollCandidate(long currentTimeMs) throws IOException {
 CandidateState state = quorum.candidateStateOrThrow();
-if (state.isBackingOff()) {
+GracefulShutdown shutdown = this.shutdown.get();
+
+if (shutdown != null) {
+// If we happen to shutdown while we are a candidate, we will 
continue
+// with the current election until one of the following conditions 
is met:
+//  1) we are elected as leader (which allows us to resign)
+//  2) another leader is elected
+//  3) the shutdown timer expires
+long minRequestBackoffMs = maybeSendVoteRequests(state, 
currentTimeMs);
+return Math.min(shutdown.remainingTimeMs(), minRequestBackoffMs);

Review comment:
   The intent is to ignore the election timeout in order to prevent a 
shutting down broker from becoming a candidate. Does that make sense?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #9468: KAFKA-10454 / (2.6) Update copartitionSourceGroups when optimization algorithm is triggered

2020-11-03 Thread GitBox


mimaison commented on pull request #9468:
URL: https://github.com/apache/kafka/pull/9468#issuecomment-720400557


   @bbejeck What's the status of this PR? can we get it merged this week?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] bbejeck commented on pull request #9468: KAFKA-10454 / (2.6) Update copartitionSourceGroups when optimization algorithm is triggered

2020-11-03 Thread GitBox


bbejeck commented on pull request #9468:
URL: https://github.com/apache/kafka/pull/9468#issuecomment-720473323







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao merged pull request #9364: KAFKA-10471 Mark broker crash during log loading as unclean shutdown

2020-11-03 Thread GitBox


junrao merged pull request #9364:
URL: https://github.com/apache/kafka/pull/9364


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scanterog opened a new pull request #9545: [mm2] Allow Checkpoints for consumers using static partition assignments

2020-11-03 Thread GitBox


scanterog opened a new pull request #9545:
URL: https://github.com/apache/kafka/pull/9545


   Currently mm2 is not generating checkpoints for upstream consumers using 
static partition assignments. The reason is that these consumers are being 
explicitly filtered out. I couldn't find why this filter is being applied. This 
PR removes this limitation.
   
   I've tested this change and it does not break any functionality. Even 
KAFKA-9076 continues working fine and allows consumer offset sync for consumers 
using static assignments.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on pull request #9364: KAFKA-10471 Mark broker crash during log loading as unclean shutdown

2020-11-03 Thread GitBox


junrao commented on pull request #9364:
URL: https://github.com/apache/kafka/pull/9364#issuecomment-720623777


   @RamanVerma  Was the test failure 
kafka.admin.ReassignPartitionsIntegrationTest.testLogDirReassignment in JDK 15 
related to this PR?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] chia7712 commented on pull request #4817: MINOR: Fix Findbugs issue about useless object in KafkaAdminClient

2020-11-03 Thread GitBox


chia7712 commented on pull request #4817:
URL: https://github.com/apache/kafka/pull/4817#issuecomment-720878512


   @attila-s Thanks for your patch. This issue was fixed by 
https://github.com/apache/kafka/commit/49db5a63c043b50c10c2dfd0648f8d74ee917b6a.
 please feel free to ping me if you have other pending PRs.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #9482: KAFKA-10632; Raft client should push all committed data to state machines

2020-11-03 Thread GitBox


hachikuji commented on a change in pull request #9482:
URL: https://github.com/apache/kafka/pull/9482#discussion_r516151645



##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1017,12 +1028,9 @@ private boolean handleFetchResponse(
 
log.truncateToEndOffset(divergingOffsetAndEpoch).ifPresent(truncationOffset -> {
 logger.info("Truncated to offset {} from Fetch response 
from leader {}",
 truncationOffset, quorum.leaderIdOrNil());
-
-// Since the end offset has been updated, we should 
complete any delayed
-// reads at the end offset.
-fetchPurgatory.maybeComplete(
-new LogOffset(Long.MAX_VALUE, Isolation.UNCOMMITTED),
-currentTimeMs);
+// After truncation, we complete all pending reads in 
order to
+// ensure that fetches account for the updated log end 
offset
+fetchPurgatory.completeAll(currentTimeMs);

Review comment:
   > With the new Listener when is this not a noop? Looking at the code, we 
only add entries to fetchPurgatorywhen the replica is a leader and it receives 
a Fetch request.
   
   Yeah, that's fair. I don't think we can truncate unless we are a follower 
and that implies we already cleared the purgatory in `onBecomeFollower`. So I 
think you are right that we are safe to remove this, though we'll probably need 
to add it back once we have follower fetching. 
   
   > I think the part that is missing is that the old leader should 
fetchPurgatory.completeAll when it loses leadership.
   
   I had considered this previously and decided to leave the fetches in 
purgatory while the election was in progress to prevent unnecessary retries 
since that is all the client can do while waiting for the outcome. On the other 
hand, some of the fetches in purgatory might be from other voters. It might be 
better to respond more quickly so that there are not any unnecessary election 
delays. I'd suggest we open a separate issue to consider this.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -228,35 +234,80 @@ private void updateLeaderEndOffsetAndTimestamp(
 final LogOffsetMetadata endOffsetMetadata = log.endOffset();
 
 if (state.updateLocalState(currentTimeMs, endOffsetMetadata)) {
-updateHighWatermark(state, currentTimeMs);
+onUpdateLeaderHighWatermark(state, currentTimeMs);
 }
 
-LogOffset endOffset = new LogOffset(endOffsetMetadata.offset, 
Isolation.UNCOMMITTED);
-fetchPurgatory.maybeComplete(endOffset, currentTimeMs);
+fetchPurgatory.maybeComplete(endOffsetMetadata.offset, currentTimeMs);
 }
 
-private void updateHighWatermark(
-EpochState state,
+private void onUpdateLeaderHighWatermark(
+LeaderState state,
 long currentTimeMs
 ) {
 state.highWatermark().ifPresent(highWatermark -> {
-logger.debug("High watermark updated to {}", highWatermark);
+logger.debug("Leader high watermark updated to {}", highWatermark);
 log.updateHighWatermark(highWatermark);
-
-LogOffset offset = new LogOffset(highWatermark.offset, 
Isolation.COMMITTED);
-appendPurgatory.maybeComplete(offset, currentTimeMs);
-fetchPurgatory.maybeComplete(offset, currentTimeMs);
+appendPurgatory.maybeComplete(highWatermark.offset, currentTimeMs);
+maybeFireHandleCommit(highWatermark.offset);

Review comment:
   I will add a comment. I agree it is a subtle point.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1778,4 +1808,98 @@ public void complete() {
 }
 }
 
+private final class ListenerContext implements 
CloseListener> {
+private final RaftClient.Listener listener;
+private BatchReader lastSent = null;
+private long lastAckedOffset = 0;
+private int claimedEpoch = 0;

Review comment:
   Let me add a helper to `ListenerContext` so that we can keep the field 
encapsulated.

##
File path: raft/src/main/java/org/apache/kafka/raft/KafkaRaftClient.java
##
@@ -1778,4 +1808,98 @@ public void complete() {
 }
 }
 
+private final class ListenerContext implements 
CloseListener> {
+private final RaftClient.Listener listener;
+private BatchReader lastSent = null;
+private long lastAckedOffset = 0;
+private int claimedEpoch = 0;
+
+private ListenerContext(Listener listener) {
+this.listener = listener;
+}
+
+/**
+ * Get the last acked offset, which is one greater than the offset of 
the
+ * last record which was acked by the state machine.
+ */
+public synchronized long 

[GitHub] [kafka] chia7712 commented on pull request #5490: MINOR: Fix typo in TaskManager

2020-11-03 Thread GitBox


chia7712 commented on pull request #5490:
URL: https://github.com/apache/kafka/pull/5490#issuecomment-720877498


   @lucapette Thanks for your patch. I'm going to close this PR since that 
issue was fixed by 
https://github.com/apache/kafka/commit/4090f9a2b0a95e4da127e4786007542276d97520.
 Feel free to ping me if you have other pending PRs :)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao merged pull request #9536: MINOR: KIP-584: Remove admin client facility to read features from controller

2020-11-03 Thread GitBox


junrao merged pull request #9536:
URL: https://github.com/apache/kafka/pull/9536


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #9547: KAFKA-9630; Migrate OffsetsForLeaderEpochRequest/Response to the auto-generated protocol

2020-11-03 Thread GitBox


dajac opened a new pull request #9547:
URL: https://github.com/apache/kafka/pull/9547


   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] omkreddy commented on pull request #9540: KAFKA-10669: Make CurrentLeaderEpoch field ignorable and set MaxNumOffsets field default to 1

2020-11-03 Thread GitBox


omkreddy commented on pull request #9540:
URL: https://github.com/apache/kafka/pull/9540#issuecomment-720433673







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on a change in pull request #9473: KAFKA-10545: Create topic IDs in ZooKeeper and Controller

2020-11-03 Thread GitBox


dengziming commented on a change in pull request #9473:
URL: https://github.com/apache/kafka/pull/9473#discussion_r516549058



##
File path: core/src/main/scala/kafka/controller/ControllerContext.scala
##
@@ -83,6 +83,8 @@ class ControllerContext {
   var epochZkVersion: Int = KafkaController.InitialControllerEpochZkVersion
 
   val allTopics = mutable.Set.empty[String]
+  var topicIds = mutable.Map.empty[String, UUID]

Review comment:
   Should these 2 names be more self-explanatory?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on pull request #9536: MINOR: KIP-584: Remove admin client facility to read features from controller

2020-11-03 Thread GitBox


kowshik commented on pull request #9536:
URL: https://github.com/apache/kafka/pull/9536#issuecomment-720276930







This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

2020-11-03 Thread GitBox


dongjinleekr commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-721135460


   Hi @vvcephei,
   
   Sorry for being late. Defending myself, I have been too busy to finalize my 
project last week, and it ended yesterday. Sure, I am now resolving the broken 
tests. There are 21 broken tests with the latest trunk, and I just resolved 16 
of them. It will be completed soon.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soarez commented on a change in pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers

2020-11-03 Thread GitBox


soarez commented on a change in pull request #9000:
URL: https://github.com/apache/kafka/pull/9000#discussion_r516680210



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/ValueTransformerSupplier.java
##
@@ -33,12 +38,17 @@
  * @see TransformerSupplier
  * @see KStream#transform(TransformerSupplier, String...)
  */
-public interface ValueTransformerSupplier extends 
ConnectedStoreProvider {
+public interface ValueTransformerSupplier extends 
ConnectedStoreProvider, Supplier> {

Review comment:
   Indeed, that was why. Since it already conforms to the interface, I 
didn't realize this would be a public API change. Will revert and overload 
`checkSupplier` instead.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/internals/ApiUtils.java
##
@@ -75,4 +81,31 @@ public static long validateMillisecondInstant(final Instant 
instant, final Strin
 public static String prepareMillisCheckFailMsgPrefix(final Object value, 
final String name) {
 return format(MILLISECOND_VALIDATION_FAIL_MSG_FRMT, name, value);
 }
+
+/**
+ * @throws IllegalArgumentException if the same instance is obtained each 
time
+ */
+public static void checkSupplier(final Supplier supplier) {
+if (supplier.get() == supplier.get()) {
+final String supplierClass = 
getAllImplementedInterfaces(supplier.getClass()).stream()

Review comment:
   We don't _really_ need to. I thought it could make for a better, and 
also predictable, error message. But maybe just using the implementing class 
name is fine.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] soarez commented on pull request #9000: KAFKA-10036 Improve handling and documentation of Suppliers

2020-11-03 Thread GitBox


soarez commented on pull request #9000:
URL: https://github.com/apache/kafka/pull/9000#issuecomment-721129774


   Thanks for giving this another look @mjsax 



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] olafurpg opened a new pull request #9548: Disable inlining of Scala library methods.

2020-11-03 Thread GitBox


olafurpg opened a new pull request #9548:
URL: https://github.com/apache/kafka/pull/9548


   Previously, the Scala compiler was configured to inline all usages of
   symbols in under the `scala` package. This is problematic because it
   means that published Kafka jars must run with the exact same version of
   the Scala library that the Kafka jar was compiled with.  If the runtime
   uses a different version of the Scala library then users risk getting a
   crash like this:
   
   ```
   java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$7
   ```
   
   This commit disables inlining from the `scala` package to prevent
   crashes like this. The downside to this change is it may introduce
   performance regressions.
   
   ### Committer Checklist (excluded from commit message)
   - [x] Verify design and implementation 
   - [x] Verify test coverage and CI build status
   - [x] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] olafurpg commented on a change in pull request #7452: KAFKA-8991: Enable scalac optimizer

2020-11-03 Thread GitBox


olafurpg commented on a change in pull request #7452:
URL: https://github.com/apache/kafka/pull/7452#discussion_r516703080



##
File path: build.gradle
##
@@ -419,6 +419,22 @@ subprojects {
 "-Xlint:constant",
 "-Xlint:unused"
   ]
+
+  // Inline more aggressively when compiling the `core` jar since it's not 
meant to be used as a library.
+  // More specifically, inline classes from the Scala library so that we 
can inline methods like `Option.exists`
+  // and avoid lambda allocations. This is only safe if the Scala library 
version is the same at compile time
+  // and runtime. We cannot guarantee this for libraries like kafka 
streams, so only inline classes from the
+  // Kafka project in that case.
+  List inlineFrom
+  if (project.name.equals('core'))
+inlineFrom = ["-opt-inline-from:scala.**", 
"-opt-inline-from:kafka.**", "-opt-inline-from:org.apache.kafka.**"]

Review comment:
   I opened https://github.com/apache/kafka/pull/9548 :) 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9548: Disable inlining of Scala library methods.

2020-11-03 Thread GitBox


ijuma commented on pull request #9548:
URL: https://github.com/apache/kafka/pull/9548#issuecomment-721150214


   @olafurpg Thanks for the PR. Can you please elaborate why there are 
conflicting versions of the Scala library when the core jar is used? The core 
jar doesn't export a public API and the assumption was that the same Scala 
library would be used at runtime.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #7452: KAFKA-8991: Enable scalac optimizer

2020-11-03 Thread GitBox


ijuma commented on a change in pull request #7452:
URL: https://github.com/apache/kafka/pull/7452#discussion_r516710457



##
File path: build.gradle
##
@@ -419,6 +419,22 @@ subprojects {
 "-Xlint:constant",
 "-Xlint:unused"
   ]
+
+  // Inline more aggressively when compiling the `core` jar since it's not 
meant to be used as a library.
+  // More specifically, inline classes from the Scala library so that we 
can inline methods like `Option.exists`
+  // and avoid lambda allocations. This is only safe if the Scala library 
version is the same at compile time
+  // and runtime. We cannot guarantee this for libraries like kafka 
streams, so only inline classes from the
+  // Kafka project in that case.
+  List inlineFrom
+  if (project.name.equals('core'))
+inlineFrom = ["-opt-inline-from:scala.**", 
"-opt-inline-from:kafka.**", "-opt-inline-from:org.apache.kafka.**"]

Review comment:
   Let's continue the discussion in #9548. Note the comment from @lrytz is 
regarding the claim that the `core` module should not be used as a library. 
This is the recommendation from the Kafka project though, so we can discuss in 
#9548 why users are depending on this module as a library.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #7452: KAFKA-8991: Enable scalac optimizer

2020-11-03 Thread GitBox


ijuma commented on a change in pull request #7452:
URL: https://github.com/apache/kafka/pull/7452#discussion_r516710457



##
File path: build.gradle
##
@@ -419,6 +419,22 @@ subprojects {
 "-Xlint:constant",
 "-Xlint:unused"
   ]
+
+  // Inline more aggressively when compiling the `core` jar since it's not 
meant to be used as a library.
+  // More specifically, inline classes from the Scala library so that we 
can inline methods like `Option.exists`
+  // and avoid lambda allocations. This is only safe if the Scala library 
version is the same at compile time
+  // and runtime. We cannot guarantee this for libraries like kafka 
streams, so only inline classes from the
+  // Kafka project in that case.
+  List inlineFrom
+  if (project.name.equals('core'))
+inlineFrom = ["-opt-inline-from:scala.**", 
"-opt-inline-from:kafka.**", "-opt-inline-from:org.apache.kafka.**"]

Review comment:
   Let's continue the discussion in #9548. Note the comment from @lrytz is 
regarding the claim that the `core` module should not be used as a library. 
This is the recommendation from the Kafka project though, so we can discuss in 
#9548 why users are depending on this module as a library (i.e. his expertise 
in the scala compiler/inliner is not relevant to this particular comment).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] wcarlson5 commented on pull request #9543: makes the Stream thread list resizable

2020-11-03 Thread GitBox


wcarlson5 commented on pull request #9543:
URL: https://github.com/apache/kafka/pull/9543#issuecomment-721154784


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] olafurpg commented on pull request #9548: Disable inlining of Scala library methods.

2020-11-03 Thread GitBox


olafurpg commented on pull request #9548:
URL: https://github.com/apache/kafka/pull/9548#issuecomment-721182321


   Even if users can't directly reference APIs of the core Kafka jar, the 
bytecode of the core jar is still evaluated at runtime. With the inliner 
enabled, the core Kafka jar directly references private anonymous classes from 
scala-library:2.12.11 that are no longer present in scala-library:2.12.12. For 
example, assuming you have `cs` (https://get-coursier.io/) installed, observe 
that `scala/math/Ordering$$anon$7` is referenced here below
   
   ```
   ❯ javap -cp $(cs fetch org.apache.kafka:kafka_2.12:2.5.1 -p) -v 
kafka.api.ApiVersion$ | grep Ordering\$\$anon
  #24 = Utf8   scala/math/Ordering$$anon$7
  #25 = Class  #24   // scala/math/Ordering$$anon$7
  #84 = Methodref  #25.#83   // 
scala/math/Ordering$$anon$7."":(Lscala/math/Ordering;Lscala/Function1;)V
   21: new   #25 // class 
scala/math/Ordering$$anon$7
   27: invokespecial #84 // Method 
scala/math/Ordering$$anon$7."":(Lscala/math/Ordering;Lscala/Function1;)V
public final #25; //class scala/math/Ordering$$anon$7
   ```
   
   The scala-library:2.12.11 jar defines this anonymous class but it's missing 
from  scala-library:2.12.12
   
   ```
   ❯ jar tf $(cs fetch org.scala-lang:scala-library:2.12.11 -p) | grep 
Ordering\$\$anon
   scala/math/Ordering$$anon$1.class
   scala/math/Ordering$$anon$10.class
   scala/math/Ordering$$anon$11.class
   scala/math/Ordering$$anon$12.class
   scala/math/Ordering$$anon$13.class
   scala/math/Ordering$$anon$14.class
   scala/math/Ordering$$anon$15.class
   scala/math/Ordering$$anon$16.class
   scala/math/Ordering$$anon$17.class
   scala/math/Ordering$$anon$18.class
   scala/math/Ordering$$anon$19.class
   scala/math/Ordering$$anon$2.class
   scala/math/Ordering$$anon$6.class
   scala/math/Ordering$$anon$7.class
   scala/math/Ordering$DoubleOrdering$$anon$9.class
   scala/math/Ordering$FloatOrdering$$anon$8.class
   scala/math/PartialOrdering$$anon$1.class
   ❯ jar tf $(cs fetch org.scala-lang:scala-library:2.12.12 -p) | grep 
Ordering\$\$anon
   scala/math/Ordering$$anon$1.class
   scala/math/Ordering$$anon$4.class
   scala/math/Ordering$$anon$5.class
   scala/math/Ordering$$anon$6.class
   scala/math/PartialOrdering$$anon$1.class
   ```
   
   
   The related upstream commit appears to be 
https://github.com/scala/scala/commit/44318c8959b3ce48a44d3c0692e87fa7dfbb8930#diff-3b590b82493a8dbfc177f6113ee07ac4772d666d99fe808c6f6da01227ad1c6d.
   
   My understanding is that the inliner flags are only safe to enable for 
applications "at the end of the world", not libraries.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] olafurpg commented on pull request #9548: Disable inlining of Scala library methods.

2020-11-03 Thread GitBox


olafurpg commented on pull request #9548:
URL: https://github.com/apache/kafka/pull/9548#issuecomment-721184420


   To put it differently, enabling the inliner allows Kafka to access private 
symbols from `scala-library.jar`. The upstream commit 
https://github.com/scala/scala/commit/44318c8959b3ce48a44d3c0692e87fa7dfbb8930#diff-3b590b82493a8dbfc177f6113ee07ac4772d666d99fe808c6f6da01227ad1c6d
 is not a binary breaking change since it only modifies private APIs.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9548: Disable inlining of Scala library methods.

2020-11-03 Thread GitBox


ijuma commented on pull request #9548:
URL: https://github.com/apache/kafka/pull/9548#issuecomment-721197381


   I understand that, the point is that the core jar is not meant to be a 
library. It has no public APIs. The clients jar is meant to be a library.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lbradstreet commented on pull request #9401: KAFKA-9628 Replace Produce request/response with automated protocol

2020-11-03 Thread GitBox


lbradstreet commented on pull request #9401:
URL: https://github.com/apache/kafka/pull/9401#issuecomment-721201536


   @chia7712 @hachikuji for the ProduceResponse handling, is this the overall 
broker side regression since you need both the construction and toStruct?
   ```
   construction regression: 3.293 -> 580.099 ns/op
   toStruct improvement: 825.889 -> 318.530 ns/op
   
   overall response: 3.293+825.889 (old) = 829.182 vs 898.629 (new)
   ```
   
   Could you please also provide an analysis of the garbage generation using 
`gc.alloc.rate.norm`?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9548: Disable inlining of Scala library methods.

2020-11-03 Thread GitBox


ijuma commented on pull request #9548:
URL: https://github.com/apache/kafka/pull/9548#issuecomment-721201268


   To elaborate a bit more, there are two cases where two cases where this can 
cause problems that I can think of:
   
   1. If `core` is used as a library and there is a mismatch between the Scala 
version used for compilation and runtime.
   2. The broker is deployed in the same process as another application and 
there is a mismatch between the Scala version used for compilation and runtime.
   
   Neither of those are recommended.
   
   I'm not opposed to changing these settings for now, but I'm also interested 
in understanding the specifics of why users are doing 1, 2 or something else 
that is deviating from our expectations.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley opened a new pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

2020-11-03 Thread GitBox


tombentley opened a new pull request #9549:
URL: https://github.com/apache/kafka/pull/9549


   These SMTs were originally specified in KIP-145 but never implemented at the 
time.
   
   HeaderTo is not included since its original specification doesn't deal with 
the fact that there can be >1 header with the same name, but a field can only 
have a single value (while you could use an array, that doesn't work if the 
headers for the given name had different schemas).
   
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

2020-11-03 Thread GitBox


tombentley commented on pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#issuecomment-721226685


   @kkonstantine would you be able to review this? These SMTs were originally 
specified in 
[KIP145](https://cwiki.apache.org/confluence/display/KAFKA/KIP-145+-+Expose+Record+Headers+in+Kafka+Connect#KIP145ExposeRecordHeadersinKafkaConnect-HeaderFrom)
 but never implemented. It seems they were forgotten about.
   
   @C0urante maybe you'd also like to review?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-11-03 Thread GitBox


tombentley commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-721228240


   Maybe @vvcephei, @mimaison or @gwenshap could take a look at this since you 
voted on the KIP? Thanks!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-11-03 Thread GitBox


tombentley commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-721229160


   We just need to revert 75e5358 (which reverted the original merged). Do you 
need an new PR for that?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-11-03 Thread GitBox


ijuma commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-721229952


   I can reapply the change soon.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9365: KAFKA-10566: Fix erroneous config usage warnings

2020-11-03 Thread GitBox


tombentley commented on pull request #9365:
URL: https://github.com/apache/kafka/pull/9365#issuecomment-721230361


   Maybe @chia7712, @dajac or @mimaison could take a look at this?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9266: KAFKA-10469: Resolve logger levels hierarchically

2020-11-03 Thread GitBox


tombentley commented on pull request #9266:
URL: https://github.com/apache/kafka/pull/9266#issuecomment-721231570


   Great, thanks Ismael.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10678) Re-deploying Streams app causes rebalance and task migration

2020-11-03 Thread Bradley Peterson (Jira)
Bradley Peterson created KAFKA-10678:


 Summary: Re-deploying Streams app causes rebalance and task 
migration
 Key: KAFKA-10678
 URL: https://issues.apache.org/jira/browse/KAFKA-10678
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.0, 2.6.1
Reporter: Bradley Peterson
 Attachments: after, before, broker

Re-deploying our Streams app causes a rebalance, even when using static group 
membership. Worse, the rebalance creates standby tasks, even when the previous 
task assignment was balanced and stable.

Our app is currently using Streams 2.6.1-SNAPSHOT (due to [KAFKA-10633]) but we 
saw the same behavior in 2.6.0. The app runs on 4 EC2 instances, each with 4 
streams threads, and data stored on persistent EBS volumes.. During a redeploy, 
all EC2 instances are stopped, new instances are launched, and the EBS volumes 
are attached to the new instances. We do not use interactive queries. 
{{session.timeout.ms}} is set to 30 minutes, and the deployment finishes well 
under that. {{num.standby.replicas}} is 0.

h2. Expected Behavior
Given a stable and balanced task assignment prior to deploying, we expect to 
see the same task assignment after deploying. Even if a rebalance is triggered, 
we do not expect to see new standby tasks.

h2. Observed Behavior
Attached are the "Assigned tasks to clients" log lines from before and after 
deploying. The "before" is from over 24 hours ago, the task assignment is well 
balanced and "Finished stable assignment of tasks, no followup rebalances 
required." is logged. The "after" log lines show the same assignment of active 
tasks, but some additional standby tasks. There are additional log lines about 
adding and removing active tasks, which I don't quite understand.

I've also included logs from the broker showing the rebalance was triggered for 
"Updating metadata".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-10678) Re-deploying Streams app causes rebalance and task migration

2020-11-03 Thread Bradley Peterson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10678?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bradley Peterson updated KAFKA-10678:
-
Attachment: broker
before
after

> Re-deploying Streams app causes rebalance and task migration
> 
>
> Key: KAFKA-10678
> URL: https://issues.apache.org/jira/browse/KAFKA-10678
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.6.1
>Reporter: Bradley Peterson
>Priority: Major
> Attachments: after, before, broker
>
>
> Re-deploying our Streams app causes a rebalance, even when using static group 
> membership. Worse, the rebalance creates standby tasks, even when the 
> previous task assignment was balanced and stable.
> Our app is currently using Streams 2.6.1-SNAPSHOT (due to [KAFKA-10633]) but 
> we saw the same behavior in 2.6.0. The app runs on 4 EC2 instances, each with 
> 4 streams threads, and data stored on persistent EBS volumes.. During a 
> redeploy, all EC2 instances are stopped, new instances are launched, and the 
> EBS volumes are attached to the new instances. We do not use interactive 
> queries. {{session.timeout.ms}} is set to 30 minutes, and the deployment 
> finishes well under that. {{num.standby.replicas}} is 0.
> h2. Expected Behavior
> Given a stable and balanced task assignment prior to deploying, we expect to 
> see the same task assignment after deploying. Even if a rebalance is 
> triggered, we do not expect to see new standby tasks.
> h2. Observed Behavior
> Attached are the "Assigned tasks to clients" log lines from before and after 
> deploying. The "before" is from over 24 hours ago, the task assignment is 
> well balanced and "Finished stable assignment of tasks, no followup 
> rebalances required." is logged. The "after" log lines show the same 
> assignment of active tasks, but some additional standby tasks. There are 
> additional log lines about adding and removing active tasks, which I don't 
> quite understand.
> I've also included logs from the broker showing the rebalance was triggered 
> for "Updating metadata".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-10633) Constant probing rebalances in Streams 2.6

2020-11-03 Thread Bradley Peterson (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10633?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Bradley Peterson resolved KAFKA-10633.
--
Fix Version/s: 2.6.1
   Resolution: Fixed

Closing this ticket as the issue is fixed in 2.6.1. I've opened [KAFKA-10678] 
for our other problem with unexpected rebalancing.

> Constant probing rebalances in Streams 2.6
> --
>
> Key: KAFKA-10633
> URL: https://issues.apache.org/jira/browse/KAFKA-10633
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Bradley Peterson
>Priority: Major
> Fix For: 2.6.1
>
> Attachments: Discover 2020-10-21T23 34 03.867Z - 2020-10-21T23 44 
> 46.409Z.csv
>
>
> We are seeing a few issues with the new rebalancing behavior in Streams 2.6. 
> This ticket is for constant probing rebalances on one StreamThread, but I'll 
> mention the other issues, as they may be related.
> First, when we redeploy the application we see tasks being moved, even though 
> the task assignment was stable before redeploying. We would expect to see 
> tasks assigned back to the same instances and no movement. The application is 
> in EC2, with persistent EBS volumes, and we use static group membership to 
> avoid rebalancing. To redeploy the app we terminate all EC2 instances. The 
> new instances will reattach the EBS volumes and use the same group member id.
> After redeploying, we sometimes see the group leader go into a tight probing 
> rebalance loop. This doesn't happen immediately, it could be several hours 
> later. Because the redeploy caused task movement, we see expected probing 
> rebalances every 10 minutes. But, then one thread will go into a tight loop 
> logging messages like "Triggering the followup rebalance scheduled for 
> 1603323868771 ms.", handling the partition assignment (which doesn't change), 
> then "Requested to schedule probing rebalance for 1603323868771 ms." This 
> repeats several times a second until the app is restarted again. I'll attach 
> a log export from one such incident.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10515) NPE: Foreign key join serde may not be initialized with default serde if application is distributed

2020-11-03 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10515?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17225532#comment-17225532
 ] 

John Roesler commented on KAFKA-10515:
--

[https://github.com/apache/kafka/pull/9467] is merged to 2.6. There were also 
merge conflicts on 2.5, but there's no bugfix release proposed for 2.5 yet, so 
I decided just to leave it at 2.6/2.7/trunk.

> NPE: Foreign key join serde may not be initialized with default serde if 
> application is distributed
> ---
>
> Key: KAFKA-10515
> URL: https://issues.apache.org/jira/browse/KAFKA-10515
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.5.1
>Reporter: Thorsten Hake
>Priority: Critical
> Fix For: 2.7.0, 2.6.1
>
>
> The fix of KAFKA-9517 fixed the initialization of the foreign key joins 
> serdes for KStream applications that do not run distributed over multiple 
> instances.
> However, if an application runs distributed over multiple instances, the 
> foreign key join serdes may still not be initialized leading to the following 
> NPE:
> {noformat}
> Encountered the following error during 
> processing:java.lang.NullPointerException: null
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:85)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionWrapperSerde$SubscriptionWrapperSerializer.serialize(SubscriptionWrapperSerde.java:52)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:59)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:50)
>   at 
> org.apache.kafka.streams.state.internals.ValueAndTimestampSerializer.serialize(ValueAndTimestampSerializer.java:27)
>   at 
> org.apache.kafka.streams.state.StateSerdes.rawValue(StateSerdes.java:192)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$put$3(MeteredKeyValueStore.java:144)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.put(MeteredKeyValueStore.java:144)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.put(ProcessorContextImpl.java:487)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:102)
>   at 
> org.apache.kafka.streams.kstream.internals.foreignkeyjoin.SubscriptionStoreReceiveProcessorSupplier$1.process(SubscriptionStoreReceiveProcessorSupplier.java:55)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.lambda$process$2(ProcessorNode.java:142)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:142)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:104)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.lambda$process$3(StreamTask.java:383)
>   at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:806)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:383)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:475)
>   at 
> org.apache.kafka.streams.processor.internals.TaskManager.process(TaskManager.java:550)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:802)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:697)
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:670){noformat}
> This happens because the processors for foreign key joins will be distributed 
> across multiple tasks. The serde will only be initialized with the default 
> serde during the initia

[GitHub] [kafka] vvcephei commented on pull request #9414: KAFKA-10585: Kafka Streams should clean up the state store directory from cleanup

2020-11-03 Thread GitBox


vvcephei commented on pull request #9414:
URL: https://github.com/apache/kafka/pull/9414#issuecomment-721239457


   Perfect. Thanks, @dongjinleekr !
   
   No defense is necessary :) I was just wondering if it was still on your 
radar.
   
   Thanks again for the contribution!



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] brbrown25 edited a comment on pull request #9057: KAFKA-10299: Implementing Kafka Connect Hash SMT to allow for hashing…

2020-11-03 Thread GitBox


brbrown25 edited a comment on pull request #9057:
URL: https://github.com/apache/kafka/pull/9057#issuecomment-717322569


   - [x] update with latest master
   - [ ] implement nested field support
   - [ ] implement salt support
   - [ ] update rat configuration



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mingaliu commented on pull request #9548: Disable inlining of Scala library methods.

2020-11-03 Thread GitBox


mingaliu commented on pull request #9548:
URL: https://github.com/apache/kafka/pull/9548#issuecomment-721242229


   @ijuma , here is the callstack, and you can see why Kafka core is pulled in 
from the client side:
   
java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$7
at kafka.api.ApiVersion$.orderingByVersion(ApiVersion.scala:45)
at kafka.api.ApiVersion.compare(ApiVersion.scala:139)
at kafka.api.ApiVersion.compare$(ApiVersion.scala:138)
at kafka.api.KAFKA_2_5_IV0$.compare(ApiVersion.scala:339)
at kafka.api.KAFKA_2_5_IV0$.compare(ApiVersion.scala:339)
at scala.math.Ordered.$greater$eq(Ordered.scala:91)
at scala.math.Ordered.$greater$eq$(Ordered.scala:91)
at kafka.api.KAFKA_2_5_IV0$.$greater$eq(ApiVersion.scala:339)
at kafka.server.KafkaConfig.(KafkaConfig.scala:1529)
at kafka.server.KafkaConfig.(KafkaConfig.scala:1238)
at 
org.apache.kafka.streams.integration.utils.KafkaEmbedded.(KafkaEmbedded.java:75)
at 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.start(EmbeddedKafkaCluster.java:103)
at 
com.twitter.finatra.kafka.test.EmbeddedKafka.beforeAll(EmbeddedKafka.scala:61)
at 
com.twitter.finatra.kafka.test.EmbeddedKafka.beforeAll$(EmbeddedKafka.scala:59)
at 
com.twitter.finatra.kafkastreams.test.AbstractKafkaStreamsFeatureTest.beforeAll(KafkaStreamsFeatureTest.scala:26)
at 
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
at com.twitter.inject.Test.run(Test.scala:28)
at org.scalatestplus.junit.JUnitRunner.run(JUnitRunner.scala:99)



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] brbrown25 edited a comment on pull request #9057: KAFKA-10299: Implementing Kafka Connect Hash SMT to allow for hashing…

2020-11-03 Thread GitBox


brbrown25 edited a comment on pull request #9057:
URL: https://github.com/apache/kafka/pull/9057#issuecomment-717322569


   - [x] update with latest master
   - [ ] implement nested field support
   - [ ] implement salt support
   - [x] update rat configuration



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] olafurpg commented on pull request #9548: Disable inlining of Scala library methods.

2020-11-03 Thread GitBox


olafurpg commented on pull request #9548:
URL: https://github.com/apache/kafka/pull/9548#issuecomment-721247846


   Another related case (https://github.com/akka/alpakka-kafka/pull/1212) is 
also from using `EmbeddedKafka` as a library for testing purposes 
https://github.com/embeddedkafka/embedded-kafka



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mingaliu edited a comment on pull request #9548: Disable inlining of Scala library methods.

2020-11-03 Thread GitBox


mingaliu edited a comment on pull request #9548:
URL: https://github.com/apache/kafka/pull/9548#issuecomment-721242229


   @ijuma , here is the callstack, and you can see why Kafka core is pulled in 
from the client side (due to using KafakEmbedded in testing):
   
```
   java.lang.NoClassDefFoundError: scala/math/Ordering$$anon$7
at kafka.api.ApiVersion$.orderingByVersion(ApiVersion.scala:45)
at kafka.api.ApiVersion.compare(ApiVersion.scala:139)
at kafka.api.ApiVersion.compare$(ApiVersion.scala:138)
at kafka.api.KAFKA_2_5_IV0$.compare(ApiVersion.scala:339)
at kafka.api.KAFKA_2_5_IV0$.compare(ApiVersion.scala:339)
at scala.math.Ordered.$greater$eq(Ordered.scala:91)
at scala.math.Ordered.$greater$eq$(Ordered.scala:91)
at kafka.api.KAFKA_2_5_IV0$.$greater$eq(ApiVersion.scala:339)
at kafka.server.KafkaConfig.(KafkaConfig.scala:1529)
at kafka.server.KafkaConfig.(KafkaConfig.scala:1238)
at 
org.apache.kafka.streams.integration.utils.KafkaEmbedded.(KafkaEmbedded.java:75)
at 
org.apache.kafka.streams.integration.utils.EmbeddedKafkaCluster.start(EmbeddedKafkaCluster.java:103)
at 
com.twitter.finatra.kafka.test.EmbeddedKafka.beforeAll(EmbeddedKafka.scala:61)
at 
com.twitter.finatra.kafka.test.EmbeddedKafka.beforeAll$(EmbeddedKafka.scala:59)
at 
com.twitter.finatra.kafkastreams.test.AbstractKafkaStreamsFeatureTest.beforeAll(KafkaStreamsFeatureTest.scala:26)
at 
org.scalatest.BeforeAndAfterAll.liftedTree1$1(BeforeAndAfterAll.scala:212)
at org.scalatest.BeforeAndAfterAll.run(BeforeAndAfterAll.scala:210)
at org.scalatest.BeforeAndAfterAll.run$(BeforeAndAfterAll.scala:208)
at com.twitter.inject.Test.run(Test.scala:28)
at org.scalatestplus.junit.JUnitRunner.run(JUnitRunner.scala:99)
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >