[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-04 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r625597504



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -163,127 +159,180 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+partitionsPerTopic, consumerToOwnedPartitions);
+}
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMaxCapacityMembers = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List assignedPartitions = new ArrayList<>();
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : ownedPartitions) {
-if (i < maxQuota) {
-consumerAssignment.add(tp);
-unassignedPartitions.remove(tp);
-} else {
-allRevokedPartitions.add(tp);
-}
-++i;
-}
 
 if (ownedPartitions.size() < minQuota) {
+// the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+// and put this member into unfilled member list
+if (ownedPartitions.size() > 0) {
+consumerAssignment.addAll(ownedPartitions);
+assignedPartitions.addAll(ownedPartitions);
+}
 unfilledMembers.add(consumer);
+} else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers < numExpectedMaxCapacityMembers) {
+// consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected max capacity members
+// so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+numMaxCapacityMembers++;
+List maxQuotaPartitions = 
ownedPartitions.subList(0, maxQuota);
+consumerAssignment.addAll(maxQuotaPartitions);
+assignedPartitions.addAll(maxQuotaPartitions);
+allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
 } else {
-// It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-if (consumerAssignment.size() == minQuota)
-minCapacityMembers.add(consumer);
-if (consumerAssi

[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-04 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r625598009



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -163,127 +159,180 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  */
 private Map> constrainedAssign(Map partitionsPerTopic,
 Map> consumerToOwnedPartitions) {
-SortedSet unassignedPartitions = 
getTopicPartitions(partitionsPerTopic);
+if (log.isDebugEnabled()) {
+log.debug("performing constrained assign. partitionsPerTopic: {}, 
consumerToOwnedPartitions: {}",
+partitionsPerTopic, consumerToOwnedPartitions);
+}
 
 Set allRevokedPartitions = new HashSet<>();
 
-// Each consumer should end up in exactly one of the below
-// the consumers not yet at capacity
+// the consumers not yet at expected capacity
 List unfilledMembers = new LinkedList<>();
-// the members with exactly maxQuota partitions assigned
-Queue maxCapacityMembers = new LinkedList<>();
-// the members with exactly minQuota partitions assigned
-Queue minCapacityMembers = new LinkedList<>();
 
 int numberOfConsumers = consumerToOwnedPartitions.size();
-int minQuota = (int) Math.floor(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
-int maxQuota = (int) Math.ceil(((double) unassignedPartitions.size()) 
/ numberOfConsumers);
+int totalPartitionsCount = 
partitionsPerTopic.values().stream().reduce(0, Integer::sum);
+
+int minQuota = (int) Math.floor(((double) totalPartitionsCount) / 
numberOfConsumers);
+int maxQuota = (int) Math.ceil(((double) totalPartitionsCount) / 
numberOfConsumers);
+// the expected number of members with maxQuota assignment
+int numExpectedMaxCapacityMembers = totalPartitionsCount % 
numberOfConsumers;
+// the number of members with exactly maxQuota partitions assigned
+int numMaxCapacityMembers = 0;
 
-// initialize the assignment map with an empty array of size minQuota 
for all members
+// initialize the assignment map with an empty array of size maxQuota 
for all members
 Map> assignment = new HashMap<>(
-
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(minQuota;
+
consumerToOwnedPartitions.keySet().stream().collect(Collectors.toMap(c -> c, c 
-> new ArrayList<>(maxQuota;
 
+List assignedPartitions = new ArrayList<>();
 // Reassign as many previously owned partitions as possible
 for (Map.Entry> consumerEntry : 
consumerToOwnedPartitions.entrySet()) {
 String consumer = consumerEntry.getKey();
 List ownedPartitions = consumerEntry.getValue();
 
 List consumerAssignment = assignment.get(consumer);
-int i = 0;
-// assign the first N partitions up to the max quota, and mark the 
remaining as being revoked
-for (TopicPartition tp : ownedPartitions) {
-if (i < maxQuota) {
-consumerAssignment.add(tp);
-unassignedPartitions.remove(tp);
-} else {
-allRevokedPartitions.add(tp);
-}
-++i;
-}
 
 if (ownedPartitions.size() < minQuota) {
+// the expected assignment size is more than consumer have 
now, so keep all the owned partitions
+// and put this member into unfilled member list
+if (ownedPartitions.size() > 0) {
+consumerAssignment.addAll(ownedPartitions);
+assignedPartitions.addAll(ownedPartitions);
+}
 unfilledMembers.add(consumer);
+} else if (ownedPartitions.size() >= maxQuota && 
numMaxCapacityMembers < numExpectedMaxCapacityMembers) {
+// consumer owned the "maxQuota" of partitions or more, and 
we're still under the number of expected max capacity members
+// so keep "maxQuota" of the owned partitions, and revoke the 
rest of the partitions
+numMaxCapacityMembers++;
+List maxQuotaPartitions = 
ownedPartitions.subList(0, maxQuota);
+consumerAssignment.addAll(maxQuotaPartitions);
+assignedPartitions.addAll(maxQuotaPartitions);
+allRevokedPartitions.addAll(ownedPartitions.subList(maxQuota, 
ownedPartitions.size()));
 } else {
-// It's possible for a consumer to be at both min and max 
capacity if minQuota == maxQuota
-if (consumerAssignment.size() == minQuota)
-minCapacityMembers.add(consumer);
-if (consumerAssi

[jira] [Commented] (KAFKA-10660) Poll time out logstash

2021-05-04 Thread Duong Pham (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10660?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=1733#comment-1733
 ] 

Duong Pham commented on KAFKA-10660:


Hi [~showuon], have you fixed this problem ?

> Poll time out logstash
> --
>
> Key: KAFKA-10660
> URL: https://issues.apache.org/jira/browse/KAFKA-10660
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.2.1
> Environment: Non Production
>Reporter: David
>Assignee: Luke Chen
>Priority: Minor
>
> I am getting below message (logstash log from kafka input which I believe I 
> need increase max.poll.interval.ms (I think the default is 3)
>  
> This member will leave the group because consumer poll timeout has expired. 
> This means the time between subsequent calls to poll() was longer than the 
> configured max.poll.interval.ms, which typically implies that the poll loop 
> is spending too much time processing messages. You can address this either by 
> increasing max.poll.interval.ms or by reducing the maximum size of batches 
> returned in poll() with max.poll.records



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jeqo commented on a change in pull request #10294: KAFKA-12450: Remove deprecated methods from ReadOnlyWindowStore

2021-05-04 Thread GitBox


jeqo commented on a change in pull request #10294:
URL: https://github.com/apache/kafka/pull/10294#discussion_r625703299



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/CompositeReadOnlyWindowStore.java
##
@@ -68,10 +65,9 @@ public V fetch(final K key, final long time) {
 }
 
 @Override
-@Deprecated
 public WindowStoreIterator fetch(final K key,
-final long timeFrom,
-final long timeTo) {
+final Instant timeFrom,

Review comment:
   Yes, default implementation is doing this convertion already. 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ncliang commented on a change in pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-05-04 Thread GitBox


ncliang commented on a change in pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#discussion_r625714846



##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java
##
@@ -482,6 +486,22 @@ private synchronized void recordSent(final 
ProducerRecord record
 }
 }
 
+private synchronized void recordSendFailed(ProducerRecord 
record) {
+if (outstandingMessages.containsKey(record)) {
+currentBatchFailed = true;
+if (flushing) {
+// flush thread may be waiting on the outstanding messages to 
clear
+this.notifyAll();

Review comment:
   This implementation differs from the implementation of `recordSend` 
above when `outstandingMessages` contains records other than the current 
record. `recordSend` will wait until `outstandingMessages` is empty and 
`flushing` before `notifyAll` . Is this intentional because we've failed and 
can't clear the outstanding messages?

##
File path: 
connect/runtime/src/main/java/org/apache/kafka/connect/runtime/SourceTaskOffsetCommitter.java
##
@@ -105,6 +105,11 @@ public void remove(ConnectorTaskId id) {
 }
 
 private void commit(WorkerSourceTask workerTask) {
+if (!workerTask.shouldCommitOffsets()) {

Review comment:
   I tend to agree with @kpatelatwork that the `shouldCommit` method is 
better encapsulated in `WorkerSourceTask` . There is a call site within 
`WorkerSourceTask` that could also benefit from the `shouldCommit` check, for 
instance. 
https://github.com/apache/kafka/blob/a63e5be4195e97e5b825b5912291144d2d0283a3/connect/runtime/src/main/java/org/apache/kafka/connect/runtime/WorkerSourceTask.java#L266
   
   For the testing aspect, could you make the method visible to tests to do the 
assertion?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12743) [Kafka Streams] - regex name for state-store change-log topic

2021-05-04 Thread Sergey Zyrianov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12743?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17338959#comment-17338959
 ] 

Sergey Zyrianov commented on KAFKA-12743:
-

Hi [~guozhang],

Looks like the workaround wont work for us due to latency in ktable updates 
compared to put() on statestore. Order of the operations is reversed (now we 
push topic and wait state appears in ktable, versus put state to statestore and 
send it to changelog topic)

With latency come race conditions and we need to come up with something else. 

I find KIP-545 is misleading when it claims "The offset translation is great 
feature to serve the foundation of migrating or failing over downstream 
consumers (*including Kafka stream applications*)"

As you pointed out there no good story for streams applications when it comes 
to DR.

 

> [Kafka Streams] - regex name for state-store change-log topic
> -
>
> Key: KAFKA-12743
> URL: https://issues.apache.org/jira/browse/KAFKA-12743
> Project: Kafka
>  Issue Type: Improvement
>  Components: mirrormaker, streams
>Affects Versions: 2.8.0
>Reporter: Sergey Zyrianov
>Priority: Major
>
> Currently, when working with Kafka backed state stores in Kafka Streams, 
> these log compacted topics are given a hardcoded name : 
> _app_id-storename-changelog_
> {noformat}
> public static String storeChangelogTopic(String applicationId, String 
> storeName) {
> return applicationId + "-" + storeName + STATE_CHANGELOG_TOPIC_SUFFIX;
> }{noformat}
>  
> MirrorMaker2(mm2) copies these topics to remote cluster under the name  
> _src-cluster-alias.app_id-storename-changelog_
>  
> When streams app fails over to the remote cluster it has troubles to find 
> changelog topic of its state store since it was renamed - given source 
> cluster prefix by mm2.
> Whats the fix should be ? instruct mm2 to keep topic name or subscribe to 
> regex *._app_id-storename-changelog_ topic name for the state's changelog.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kamalcph commented on a change in pull request #10259: MINOR: Provide valid examples in README page.

2021-05-04 Thread GitBox


kamalcph commented on a change in pull request #10259:
URL: https://github.com/apache/kafka/pull/10259#discussion_r625788577



##
File path: tests/README.md
##
@@ -97,26 +97,26 @@ Examining CI run
 
 * Set BUILD_ID is travis ci's build id. E.g. build id is 169519874 for the 
following build
 ```bash
-https://travis-ci.org/apache/kafka/builds/169519874

Review comment:
   Removed the doc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

2021-05-04 Thread GitBox


tombentley commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r625802963



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -279,30 +279,33 @@ class KafkaApis(val requestChannel: RequestChannel,
 new 
StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
 } else {
   val partitionStates = stopReplicaRequest.partitionStates().asScala
-  val (result, error) = replicaManager.stopReplicas(
-request.context.correlationId,
-stopReplicaRequest.controllerId,
-stopReplicaRequest.controllerEpoch,
-stopReplicaRequest.brokerEpoch,
-partitionStates)
-  // Clear the coordinator caches in case we were the leader. In the case 
of a reassignment, we
-  // cannot rely on the LeaderAndIsr API for this since it is only sent to 
active replicas.
-  result.forKeyValue { (topicPartition, error) =>
-if (error == Errors.NONE) {
-  if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
-  && partitionStates(topicPartition).deletePartition) {
-groupCoordinator.onResignation(topicPartition.partition)
-  } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
- && partitionStates(topicPartition).deletePartition) {
+  def onStopReplicas(error: Errors, partitions: Map[TopicPartition, 
Errors]): Unit = {
+// Clear the coordinator caches in case we were the leader. In the 
case of a reassignment, we
+// cannot rely on the LeaderAndIsr API for this since it is only sent 
to active replicas.
+partitions.forKeyValue { (topicPartition, partitionError) =>
+  if (partitionError == Errors.NONE) {
 val partitionState = partitionStates(topicPartition)
 val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-Some(partitionState.leaderEpoch)
+  Some(partitionState.leaderEpoch)
 else
   None
-txnCoordinator.onResignation(topicPartition.partition, 
coordinatorEpoch = leaderEpoch)
+if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
+  && partitionState.deletePartition) {
+  groupCoordinator.onResignation(topicPartition.partition, 
leaderEpoch)
+} else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
+  && partitionState.deletePartition) {
+  txnCoordinator.onResignation(topicPartition.partition, 
coordinatorEpoch = leaderEpoch)
+}
   }
 }
   }
+  val (result, error) = replicaManager.stopReplicas(
+request.context.correlationId,
+stopReplicaRequest.controllerId,
+stopReplicaRequest.controllerEpoch,
+stopReplicaRequest.brokerEpoch,
+partitionStates,
+onStopReplicas)

Review comment:
   @guozhangwang yes, that's right. I forgot about our conversation about 
the lock when @hachikuji asked about why we were using the callback  
:disappointed:. 
   
   I notice that the `partitionLock` is acquired by the `addLoadingPartition` 
call in `loadGroupsAndOffsets`, and is also acquired in 
`removeGroupsAndOffsets`. Wouldn't it be simpler to use that than 
`replicaStateChangeLock` at this point if we're wanting to avoid a third way of 
handling concurrency here, or is there some subtlety? Obviously we wouldn't 
hold it for the call to `doLoadGroupsAndOffsets` in `loadGroupsAndOffsets`, 
just for the two checks at the start
   
   ```
   if (!maybeUpdateCoordinatorEpoch(topicPartition.partition, 
Some(coordinatorEpoch))) {
 info(s"Not loading offsets and group metadata for $topicPartition " +
   s"in epoch $coordinatorEpoch since current epoch is 
${epochForPartitionId.get(topicPartition.partition)}")
   } else if (!addLoadingPartition(topicPartition.partition)) {
 info(s"Already loading offsets and group metadata from 
$topicPartition")
   }
   ```




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied

2021-05-04 Thread GitBox


spena commented on a change in pull request #10613:
URL: https://github.com/apache/kafka/pull/10613#discussion_r625811787



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -155,7 +156,7 @@ public long get() {
 
 final String outerJoinStoreName = userProvidedBaseStoreName == 
null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + 
outerJoinSuffix;
 
-outerJoinWindowStore = 
Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, 
streamJoinedInternal));
+outerJoinWindowStore = 
Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, 
streamJoinedInternal, persistent));

Review comment:
   No reasons at all. I just chose one side to decide whether go mem or 
rocks. Left seems the only side that always is True either is Left or Outer. I 
was thinking of maybe going mem if in left joins, the left side is mem; or if 
in outer joins, both sides left/right are mem. What do you think?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on a change in pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-04 Thread GitBox


showuon commented on a change in pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#discussion_r625814273



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractStickyAssignor.java
##
@@ -149,12 +149,8 @@ private boolean allSubscriptionsEqual(Set 
allTopics,
  * This constrainedAssign optimizes the assignment algorithm when all 
consumers were subscribed to same set of topics.
  * The method includes the following steps:
  *
- * 1. Reassign as many previously owned partitions as possible, up to the 
maxQuota
- * 2. Fill remaining members up to minQuota
- * 3. If we ran out of unassigned partitions before filling all consumers, 
we need to start stealing partitions
- *from the over-full consumers at max capacity
- * 4. Otherwise we may have run out of unfilled consumers before assigning 
all partitions, in which case we
- *should just distribute one partition each to all consumers at min 
capacity
+ * 1. Reassign as many previously owned partitions as possible, up to the 
expected number of maxQuota, otherwise, minQuota

Review comment:
   Nice catch! I put all possibilities in the java doc. Thanks.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-04 Thread GitBox


showuon commented on pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#issuecomment-831972238


   @ableegoldman @guozhangwang , thanks for your good comments. I've addressed 
them in this commit: 
https://github.com/apache/kafka/pull/10509/commits/868aaf431ca66fce85d70c43909b5bc1849cd097.
 Thank you very much.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] tombentley commented on pull request #9878: KAFKA-6987: Add KafkaFuture.toCompletionStage()

2021-05-04 Thread GitBox


tombentley commented on pull request #9878:
URL: https://github.com/apache/kafka/pull/9878#issuecomment-831972566


   @dajac @ijuma please could you review this, it's the implementation for 
KIP-707. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] spena commented on a change in pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied

2021-05-04 Thread GitBox


spena commented on a change in pull request #10613:
URL: https://github.com/apache/kafka/pull/10613#discussion_r625833095



##
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##
@@ -863,6 +920,60 @@ public void 
streamStreamLeftJoinTopologyWithCustomStoresNames() {
 describe.toString());
 }
 
+@Test
+public void streamStreamLeftJoinTopologyWithCustomStoresSuppliers() {
+final StreamsBuilder builder  = new StreamsBuilder();
+final KStream stream1;
+final KStream stream2;
+
+stream1 = builder.stream("input-topic1");
+stream2 = builder.stream("input-topic2");
+
+final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+final WindowBytesStoreSupplier thisStoreSupplier = 
Stores.inMemoryWindowStore("in-memory-join-store",
+Duration.ofMillis(joinWindows.size() + 
joinWindows.gracePeriodMs()),
+Duration.ofMillis(joinWindows.size()), true);
+
+final WindowBytesStoreSupplier otherStoreSupplier = 
Stores.inMemoryWindowStore("in-memory-join-store-other",
+Duration.ofMillis(joinWindows.size() + 
joinWindows.gracePeriodMs()),
+Duration.ofMillis(joinWindows.size()), true);
+
+stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+joinWindows,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+.withThisStoreSupplier(thisStoreSupplier)
+.withOtherStoreSupplier(otherStoreSupplier));
+
+final TopologyDescription describe = builder.build().describe();
+
+assertEquals(
+"Topologies:\n" +
+"   Sub-topology: 0\n" +
+"Source: KSTREAM-SOURCE-00 (topics: 
[input-topic1])\n" +
+"  --> KSTREAM-WINDOWED-02\n" +
+"Source: KSTREAM-SOURCE-01 (topics: 
[input-topic2])\n" +
+"  --> KSTREAM-WINDOWED-03\n" +
+"Processor: KSTREAM-WINDOWED-02 (stores: 
[in-memory-join-store])\n" +
+"  --> KSTREAM-JOINTHIS-04\n" +
+"  <-- KSTREAM-SOURCE-00\n" +
+"Processor: KSTREAM-WINDOWED-03 (stores: 
[in-memory-join-store-other])\n" +
+"  --> KSTREAM-OUTEROTHER-05\n" +
+"  <-- KSTREAM-SOURCE-01\n" +
+"Processor: KSTREAM-JOINTHIS-04 (stores: 
[in-memory-join-store-other, KSTREAM-OUTERSHARED-04-memory-store])\n" +

Review comment:
   I just moved the name builder inside the 
`sharedOuterJoinWindowStoreBuilder`. I did not create the name based on the 
store supplier, though. I would have to append a suffix and I wasn't sure if it 
would be too long. For instance, if the store supplier is named 
`in-memory-join-store`, then I would name the shared outer store as 
`in-memory-join-store-left-shared-join-store`. 
   
   Also, what name should I use for the full outer? The `this` supplier or the 
`other` supplier? Say suppliers are named `mem-left-store` and 
`mem-right-store`. Would I end up with `mem-left-store-outer-shared-join-store` 
or `mem-right-store-outer-shared-join-store`?
   
   What do you think? 




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10630: MINOR: Stop logging raw record contents above TRACE level in WorkerSourceTask

2021-05-04 Thread GitBox


C0urante commented on pull request #10630:
URL: https://github.com/apache/kafka/pull/10630#issuecomment-832062862


   @rhauch @tombentley could you please review at your earliest convenience?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante opened a new pull request #10630: MINOR: Stop logging raw record contents above TRACE level in WorkerSourceTask

2021-05-04 Thread GitBox


C0urante opened a new pull request #10630:
URL: https://github.com/apache/kafka/pull/10630


   Accidental logging of record contents is a security risk as they may contain 
sensitive information such as PII. This downgrades the level of log messages in 
the `WorkerSourceTask` class that contain raw record contents to `TRACE` level 
in order to make that scenario less likely.
   
   As these changes are trivial no tests are added.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang merged pull request #10294: KAFKA-12450: Remove deprecated methods from ReadOnlyWindowStore

2021-05-04 Thread GitBox


guozhangwang merged pull request #10294:
URL: https://github.com/apache/kafka/pull/10294


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10294: KAFKA-12450: Remove deprecated methods from ReadOnlyWindowStore

2021-05-04 Thread GitBox


guozhangwang commented on pull request #10294:
URL: https://github.com/apache/kafka/pull/10294#issuecomment-832072486


   Merged to trunk, thanks @jeqo 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12635) Mirrormaker 2 offset sync is incorrect if the target partition is empty

2021-05-04 Thread Frank Yi (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12635?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17339143#comment-17339143
 ] 

Frank Yi commented on KAFKA-12635:
--

[~yangguo1220] I was able to repro using the steps below on 2.8.0 brokers and 
MM2.
 # Create a topic with 1 partition on the source cluster, with `retention.ms` 
set to something short like 10 seconds.
 ## `./kafka-topics.sh --bootstrap-server $SOURCE --create --topic myTopic 
--config 'retention.ms=1'`
 # Create a consumer that consumes from the topic
 ## `./kafka-console-consumer.sh --bootstrap-server $SOURCE --group myConsumer 
--topic myTopic`
 # Send 100 messages to the topic. These should get consumed by the consumer. 
Offset for this consumer on source cluster should be 100.
 ## `for i in $(seq 1 100); do echo $i; done | ./kafka-console-producer.sh 
--bootstrap-server $SOURCE --topic myTopic`
 # Wait until the retention policy deletes the records
 # Start MM2 with `source->target.sync.group.offsets.enabled = true`
 # Observe on the target cluster that log-end-offset is 0, offset is 100, and 
lag is -100.
 ## `./kafka-consumer-groups.sh --bootstrap-server $TARGET --describe --group 
myConsumer`

> Mirrormaker 2 offset sync is incorrect if the target partition is empty
> ---
>
> Key: KAFKA-12635
> URL: https://issues.apache.org/jira/browse/KAFKA-12635
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.7.0
>Reporter: Frank Yi
>Assignee: Ning Zhang
>Priority: Major
>
> This bug occurs when using Mirrormaker with "sync.group.offsets.enabled = 
> true".
> If a source partition is empty, but the source consumer group's offset for 
> that partition is non-zero, then Mirrormaker sets the target consumer group's 
> offset for that partition to the literal, not translated, offset of the 
> source consumer group. This state can be reached if the source consumer group 
> consumed some records that were now deleted (like by a retention policy), or 
> if Mirrormaker replication is set to start at "latest". This bug causes the 
> target consumer group's lag for that partition to be negative and breaks 
> offset sync for that partition until lag is positive.
> The correct behavior when the source partition is empty would be to set the 
> target offset to the translated offset, not literal offset, which in this 
> case would always be 0. 
> Original email thread on this issue: 
> https://lists.apache.org/thread.html/r7c54ee5f57227367b911d4abffa72781772d8dd3b72d75eb65ee19f7%40%3Cusers.kafka.apache.org%3E



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ableegoldman commented on pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-04 Thread GitBox


ableegoldman commented on pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#issuecomment-832158493


   @showuon it looks like there are some related test failures, can you look 
into those?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #10568: [DO NOT MERGE] KAFKA-8897: Upgrade RocksDB

2021-05-04 Thread GitBox


cadonna commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626032529



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -396,6 +396,21 @@ public void close() {
 log.info("Skipping to close non-initialized store {}", 
entry.getKey());
 }
 }
+for (final StateStore store : globalStateStores) {

Review comment:
   We have two data structures that contain state stores but we check only 
one when we close the state stores. In the error case it can happen that one 
contains an open store that the other data structure does not contain. This is 
a quick fix for it, but we should consider whether it is possible to 
consolidate these two data structures.  




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10587: KAFKA-8897: Upgrade RocksDB to 6.8.1

2021-05-04 Thread GitBox


cadonna commented on pull request #10587:
URL: https://github.com/apache/kafka/pull/10587#issuecomment-832173985


   Superseeded by https://github.com/apache/kafka/pull/10568


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna closed pull request #10587: KAFKA-8897: Upgrade RocksDB to 6.8.1

2021-05-04 Thread GitBox


cadonna closed pull request #10587:
URL: https://github.com/apache/kafka/pull/10587


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scott-hendricks commented on a change in pull request #10621: MINOR: Fix a couple Trogdor issues.

2021-05-04 Thread GitBox


scott-hendricks commented on a change in pull request #10621:
URL: https://github.com/apache/kafka/pull/10621#discussion_r626049414



##
File path: 
trogdor/src/main/java/org/apache/kafka/trogdor/workload/TimestampRecordProcessor.java
##
@@ -38,7 +38,7 @@
  *
  * Example spec:
  * {
- *"type": "timestampRandom",
+ *"type": "timestamp",

Review comment:
   This is not for a `PayloadGenerator` object, this is for a 
`RecordProcessor` object.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scott-hendricks commented on a change in pull request #10621: MINOR: Fix a couple Trogdor issues.

2021-05-04 Thread GitBox


scott-hendricks commented on a change in pull request #10621:
URL: https://github.com/apache/kafka/pull/10621#discussion_r626049787



##
File path: 
trogdor/src/main/java/org/apache/kafka/trogdor/workload/ConstantThroughputGenerator.java
##
@@ -26,17 +26,11 @@
  * The lower the window size, the smoother the traffic will be. Using a 100ms 
window offers no noticeable spikes in
  * traffic while still being long enough to avoid too much overhead.
  *
- * WARNING: Due to binary nature of throughput in terms of messages sent in a 
window, each window will send at least 1
- * message, and each window sends the same number of messages, rounded down. 
For example, 99 messages per second with a
- * 100ms window will only send 90 messages per second, or 9 messages per 
window. Another example, in order to send only
- * 5 messages per second, a window size of 200ms is required. In cases like 
these, both the `messagesPerSecond` and
- * `windowSizeMs` parameters should be adjusted together to achieve more 
accurate throughput.
- *
  * Here is an example spec:
  *
  * {
  *"type": "constant",
- *"messagesPerSecond": 500,
+ *"messagesPerWindow": 50,

Review comment:
   Sure, that's reasonable.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] scott-hendricks commented on a change in pull request #10621: MINOR: Fix a couple Trogdor issues.

2021-05-04 Thread GitBox


scott-hendricks commented on a change in pull request #10621:
URL: https://github.com/apache/kafka/pull/10621#discussion_r626055041



##
File path: 
trogdor/src/main/java/org/apache/kafka/trogdor/workload/GaussianTimestampConstantPayloadGenerator.java
##
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.trogdor.workload;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import org.apache.kafka.common.utils.Time;
+
+import java.nio.ByteBuffer;
+import java.nio.ByteOrder;
+import java.util.Random;
+
+/**
+ * This class behaves identically to TimestampConstantPayloadGenerator, except 
the message size follows a gaussian
+ * distribution.
+ *
+ * This should be used in conjunction with TimestampRecordProcessor in the 
Consumer to measure true end-to-end latency
+ * of a system.
+ *
+ * `messageSizeAverage` - The average size in bytes of each message.
+ * `messageSizeDeviation` - The standard deviation to use when calculating 
message size.
+ * `messagesUntilSizeChange` - The number of messages to keep at the same size.
+ *
+ * Here is an example spec:
+ *
+ * {
+ *"type": "gaussianTimestampConstant",
+ *"messageSizeAverage": 512,
+ *"messageSizeDeviation": 100,
+ *"messagesUntilSizeChange": 100
+ * }
+ *
+ * This will generate messages on a gaussian distribution with an average size 
each 512-bytes. The message sizes will
+ * have a standard deviation of 100 bytes, and the size will only change every 
100 messages.  The distribution of
+ * messages will be as follows:
+ *
+ *The average size of the messages are 512 bytes.
+ *~68% of the messages are between 412 and 612 bytes
+ *~95% of the messages are between 312 and 712 bytes
+ *~99% of the messages are between 212 and 812 bytes
+ */
+
+public class GaussianTimestampConstantPayloadGenerator implements 
PayloadGenerator {
+private final int messageSizeAverage;
+private final int messageSizeDeviation;
+private final int messagesUntilSizeChange;
+private final long seed;
+
+private final Random random = new Random();
+private final ByteBuffer buffer;
+
+private int messageTracker = 0;
+private int messageSize = 0;
+
+@JsonCreator
+public 
GaussianTimestampConstantPayloadGenerator(@JsonProperty("messageSizeAverage") 
int messageSizeAverage,
+ 
@JsonProperty("messageSizeDeviation") int messageSizeDeviation,

Review comment:
   That's a good point, I'll change all the Gaussian generators to Double.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12748) Explore new RocksDB options to consider enabling by default

2021-05-04 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-12748:
--

 Summary: Explore new RocksDB options to consider enabling by 
default
 Key: KAFKA-12748
 URL: https://issues.apache.org/jira/browse/KAFKA-12748
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: A. Sophie Blee-Goldman


With the rocksdb version bump comes a lot of new options, some of which look 
interesting enough to explore for usage in Streams. We should try setting these 
as default options and run the benchmarks to look for any performance benefit 
(or decrease). See javadocs for all Options 
[here|https://javadoc.io/doc/org.rocksdb/rocksdbjni/latest/org/rocksdb/Options.html]


Options.setAvoidUnnecessaryBlockingIO: 
- As the name suggest, avoids blocking/long-latency tasks by scheduling a 
background job to do it

 Options.setBestEffortsRecovery: 
- Interesting feature to allow recovering missing files without the use of 
the WAL. Could be useful if the on-disk state is corrupted (eg user deletes a 
file) without needing to rebuild state from scratch. Though I'd want to dig in 
further to understand what exactly it does and does not do. Not a performance 
improvement but we should run the benchmarks to make sure it doesn't make the 
performance worse.

Options.setWriteDbidToManifest:
- Should be set to true if/when we ever need to rely on the DB id eg for 
backups. Also not a performance improvement but we should still benchmark this.


Options.optimizeForSmallDb:
- This one is definitely not something we should set by default, as "small" 
here means under 1GB. But it's probably worth at least calling out in the docs 
for those users who know their data set size (per store) is under a GB



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-8326) Add Serde> support

2021-05-04 Thread Daniyar Yeralin (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8326?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Daniyar Yeralin updated KAFKA-8326:
---
Description: 
_This ticket proposes adding new {color:#4c9aff}ListSerializer{color} and 
{color:#4c9aff}ListDeserializer{color} classes as well as support for the new 
classes into the Serdes class. This will allow using List Serde of type_ 
{color:#4c9aff}_Serde>_{color} _directly from Consumers, Producers 
and Streams._

_{color:#4c9aff}Serde>{color} serialization and deserialization 
will be done through repeatedly calling a serializer/deserializer for each 
entry provided by passed generic {color:#4c9aff}Inner{color}'s Serde. For 
example, if you want to create List of Strings serde, then 
serializer/deserializer of StringSerde will be used to serialize/deserialize 
each entry in {color:#4c9aff}List{color}._

I believe there are many use cases where List Serde could be used:
 * 
[https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows]
 * 
[https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api]

For instance, aggregate grouped (by key) values together in a list to do other 
subsequent operations on the collection.

KIP Link: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization]

  was:
_This ticket proposes adding new {color:#4c9aff}ListSerializer{color} and 
{color:#4c9aff}ListDeserializer{color} classes as well as support for the new 
classes into the Serdes class. This will allow using List Serde of type_ 
{color:#4c9aff}_Serde>_{color} _directly from Consumers, Producers 
and Streams._

_{color:#4c9aff}Serde>{color} serialization and deserialization 
will be done through repeatedly calling a serializer/deserializer for each 
entry provided by passed generic {color:#4c9aff}Inner{color}'s Serde. For 
example, if you want to create List of Strings serde, then 
serializer/deserializer of StringSerde will be used to serialize/deserialize 
each entry in {color:#4c9aff}List{color}._

I believe there are many use cases where List Serde could be used. Ex. 
[https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows],
 
[https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api]

For instance, aggregate grouped (by key) values together in a list to do other 
subsequent operations on the collection.

KIP Link: 
[https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization]


> Add Serde> support
> --
>
> Key: KAFKA-8326
> URL: https://issues.apache.org/jira/browse/KAFKA-8326
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, streams
>Reporter: Daniyar Yeralin
>Assignee: Daniyar Yeralin
>Priority: Minor
>  Labels: kip
>
> _This ticket proposes adding new {color:#4c9aff}ListSerializer{color} and 
> {color:#4c9aff}ListDeserializer{color} classes as well as support for the new 
> classes into the Serdes class. This will allow using List Serde of type_ 
> {color:#4c9aff}_Serde>_{color} _directly from Consumers, 
> Producers and Streams._
> _{color:#4c9aff}Serde>{color} serialization and deserialization 
> will be done through repeatedly calling a serializer/deserializer for each 
> entry provided by passed generic {color:#4c9aff}Inner{color}'s Serde. For 
> example, if you want to create List of Strings serde, then 
> serializer/deserializer of StringSerde will be used to serialize/deserialize 
> each entry in {color:#4c9aff}List{color}._
> I believe there are many use cases where List Serde could be used:
>  * 
> [https://stackoverflow.com/questions/41427174/aggregate-java-objects-in-a-list-with-kafka-streams-dsl-windows]
>  * 
> [https://stackoverflow.com/questions/46365884/issue-with-arraylist-serde-in-kafka-streams-api]
> For instance, aggregate grouped (by key) values together in a list to do 
> other subsequent operations on the collection.
> KIP Link: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-466%3A+Add+support+for+List%3CT%3E+serialization+and+deserialization]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] jolshan commented on a change in pull request #10602: KAFKA-12724: Add 2.8.0 to system tests and streams upgrade tests.

2021-05-04 Thread GitBox


jolshan commented on a change in pull request #10602:
URL: https://github.com/apache/kafka/pull/10602#discussion_r626115350



##
File path: tests/kafkatest/tests/streams/streams_upgrade_test.py
##
@@ -25,12 +25,12 @@
 from kafkatest.services.zookeeper import ZookeeperService
 from kafkatest.tests.streams.utils import extract_generation_from_logs, 
extract_generation_id
 from kafkatest.version import LATEST_0_10_0, LATEST_0_10_1, LATEST_0_10_2, 
LATEST_0_11_0, LATEST_1_0, LATEST_1_1, \
-LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
LATEST_2_6, LATEST_2_7, DEV_BRANCH, DEV_VERSION, KafkaVersion
+LATEST_2_0, LATEST_2_1, LATEST_2_2, LATEST_2_3, LATEST_2_4, LATEST_2_5, 
LATEST_2_6, LATEST_2_7, LATEST_2_8, DEV_BRANCH, DEV_VERSION, KafkaVersion
 
 # broker 0.10.0 is not compatible with newer Kafka Streams versions
 broker_upgrade_versions = [str(LATEST_0_10_1), str(LATEST_0_10_2), 
str(LATEST_0_11_0), str(LATEST_1_0), str(LATEST_1_1), \
str(LATEST_2_0), str(LATEST_2_1), str(LATEST_2_2), 
str(LATEST_2_3), \
-   str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(DEV_BRANCH)]
+   str(LATEST_2_4), str(LATEST_2_5), str(LATEST_2_6), 
str(LATEST_2_7), str(LATEST_2_8), str(DEV_BRANCH)]

Review comment:
   I noticed we are updating the versions for one test 
(test_upgrade_downgrade_brokers), but not for the other one 
(test_metadata_upgrade)
   
   The metadata_1_versions and metadata_2_versions are the versions used in 
this test. Should we keep updating those as well? I'm not super familiar with 
the streams tests here, only noticed this when running the system test files 
changed in this PR.
   
   The tests that were not ignored passed btw :)
   
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-12749) Changelog topic config on suppressed KTable lost

2021-05-04 Thread Philip Bourke (Jira)
Philip Bourke created KAFKA-12749:
-

 Summary: Changelog topic config on suppressed KTable lost
 Key: KAFKA-12749
 URL: https://issues.apache.org/jira/browse/KAFKA-12749
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.6.2, 2.8.0, 2.6.1, 2.7.0, 2.6.0
Reporter: Philip Bourke


When trying to set the changelog configuration on a suppressed KTable, the 
config is lost if either {{emitEarlyWhenFull}} or {{shutDownWhenFull}} is set 
after the logging config.

This works - 
{code:java}
.suppress(Suppressed.untilTimeLimit(Duration.ofMillis(maxIdleIntervalMs), 
BufferConfig.maxRecords(
 
maxBufferRecords).emitEarlyWhenFull().withLoggingEnabled(changelogConfig)){code}
but not if you set {{emitEarlyWhenFull}} last.

See comments in https://issues.apache.org/jira/browse/KAFKA-8147

 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8147) Add changelog topic configuration to KTable suppress

2021-05-04 Thread Philip Bourke (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17339321#comment-17339321
 ] 

Philip Bourke commented on KAFKA-8147:
--

[~ableegoldman] https://issues.apache.org/jira/browse/KAFKA-12749 created for 
this. If I can find the time I would pick it up but that may be hard to do.

> Add changelog topic configuration to KTable suppress
> 
>
> Key: KAFKA-8147
> URL: https://issues.apache.org/jira/browse/KAFKA-8147
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Maarten
>Assignee: highluck
>Priority: Minor
>  Labels: kip
> Fix For: 2.6.0
>
>
> The streams DSL does not provide a way to configure the changelog topic 
> created by KTable.suppress.
> From the perspective of an external user this could be implemented similar to 
> the configuration of aggregate + materialized, i.e.,
> {code:java}
> changelogTopicConfigs = // Configs
> materialized = Materialized.as(..).withLoggingEnabled(changelogTopicConfigs)
> ..
> KGroupedStream.aggregate(..,materialized)
> {code}
> [KIP-446: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress|https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-8147) Add changelog topic configuration to KTable suppress

2021-05-04 Thread Philip Bourke (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17339321#comment-17339321
 ] 

Philip Bourke edited comment on KAFKA-8147 at 5/4/21, 10:08 PM:


[~ableegoldman] [KAFKA-12749|https://issues.apache.org/jira/browse/KAFKA-12749] 
created for this. If I can find the time I would pick it up but that may be 
hard to do.


was (Author: philbour):
[~ableegoldman] https://issues.apache.org/jira/browse/KAFKA-12749 created for 
this. If I can find the time I would pick it up but that may be hard to do.

> Add changelog topic configuration to KTable suppress
> 
>
> Key: KAFKA-8147
> URL: https://issues.apache.org/jira/browse/KAFKA-8147
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Maarten
>Assignee: highluck
>Priority: Minor
>  Labels: kip
> Fix For: 2.6.0
>
>
> The streams DSL does not provide a way to configure the changelog topic 
> created by KTable.suppress.
> From the perspective of an external user this could be implemented similar to 
> the configuration of aggregate + materialized, i.e.,
> {code:java}
> changelogTopicConfigs = // Configs
> materialized = Materialized.as(..).withLoggingEnabled(changelogTopicConfigs)
> ..
> KGroupedStream.aggregate(..,materialized)
> {code}
> [KIP-446: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress|https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-12234) Extend OffsetFetch requests to accept multiple group ids.

2021-05-04 Thread Sanjana Kaundinya (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12234?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Sanjana Kaundinya reassigned KAFKA-12234:
-

Assignee: Sanjana Kaundinya

> Extend OffsetFetch requests to accept multiple group ids.
> -
>
> Key: KAFKA-12234
> URL: https://issues.apache.org/jira/browse/KAFKA-12234
> Project: Kafka
>  Issue Type: Improvement
>Affects Versions: 2.8.0
>Reporter: Tom Scott
>Assignee: Sanjana Kaundinya
>Priority: Minor
> Fix For: 3.0.0
>
>
> More details are in the KIP: 
> https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=173084258



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9177) Pause completed partitions on restore consumer

2021-05-04 Thread Andrey Polyakov (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9177?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17339336#comment-17339336
 ] 

Andrey Polyakov commented on KAFKA-9177:


It's possible this got missed as part of KAFKA-9113? I'm seeing many messages 
like this per second in our 2.6.1 Kafka Streams application logs:
{code}
{"timestamp":{"seconds":1620165908,"nanos":76900},"thread":"myapp-StreamThread-1","severity":"DEBUG","loggerName":"org.apache.kafka.streams.processor.internals.StoreChangelogReader","message":"stream-thread
 [myapp-StreamThread-1] Finished restoring all changelogs []"}
{code}


> Pause completed partitions on restore consumer
> --
>
> Key: KAFKA-9177
> URL: https://issues.apache.org/jira/browse/KAFKA-9177
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Assignee: Guozhang Wang
>Priority: Major
> Fix For: 2.6.0
>
>
> The StoreChangelogReader is responsible for tracking and restoring active 
> tasks, but once a store has finished restoring it will continue polling for 
> records on that partition.
> Ordinarily this doesn't make a difference as a store is not completely 
> restored until its entire changelog has been read, so there are no more 
> records for poll to return anyway. But if the restoring state is actually an 
> optimized source KTable, the changelog is just the source topic and poll will 
> keep returning records for that partition until all stores have been restored.
> Note that this isn't a correctness issue since it's just the restore 
> consumer, but it is wasteful to be polling for records and throwing them 
> away. We should pause completed partitions in StoreChangelogReader so we 
> don't slow down the restore consumer in reading from the unfinished changelog 
> topics, and avoid wasted network.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

2021-05-04 Thread GitBox


guozhangwang commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626160997



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -396,6 +396,21 @@ public void close() {
 log.info("Skipping to close non-initialized store {}", 
entry.getKey());
 }
 }
+for (final StateStore store : globalStateStores) {

Review comment:
   Surprised to see we actually have two as well. Did a quick look at them, 
and I think they can be consolidated indeed. Some more details:
   
   1) The `mgr.initialize` would try to register all stores within 
`globalStateStores`, which would put them into the other `globalStores` map one 
by one via the `registerStateStore` call. So after the `initialize` call, the 
two stores should contain the same piece of metadata.
   
   2) But note that, before `initialize` call, no stores should be opened yet. 
So if a failure happens before that call, then all stores should be in `closed` 
form, and this logic would never be triggered.
   
   3) Within `initialize` we call `restoreState`, and only after that we would 
add the stores to the `globalStores` here. So if a failure happens during 
`restoreState`, the `globalStores` would not contain it while we have to rely 
on `globalStateStores`.
   
   Based on that, I can file a quick follow-up fix after your PR to consolidate 
these two.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
##
@@ -383,8 +386,7 @@ public void 
shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetricsVersionLatest(
 }
 
 private void shouldLogAndMeterWhenSkippingNullKeyWithBuiltInMetrics(final 
String builtInMetricsVersion) {
-final InternalMockProcessorContext context = 
createInternalMockProcessorContext(builtInMetricsVersion);
-processor.init(context);
+setup(builtInMetricsVersion, true);

Review comment:
   Could you remind me: why we need to enable caching here, but not in 
others below?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/processor/internals/GlobalStreamThreadTest.java
##
@@ -189,16 +199,6 @@ public void shouldCloseStateStoresOnClose() throws 
Exception {
 assertFalse(globalStore.isOpen());
 }
 
-@Test
-public void shouldTransitionToDeadOnClose() throws Exception {

Review comment:
   Why remove this test?

##
File path: 
streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamKTableJoinTest.java
##
@@ -242,6 +242,7 @@ private void shouldLogAndMeterWhenSkippingNullLeftKey(final 
String builtInMetric
 props.setProperty(StreamsConfig.BUILT_IN_METRICS_VERSION_CONFIG, 
builtInMetricsVersion);
 
 try (final LogCaptureAppender appender = 
LogCaptureAppender.createAndRegister(KStreamKTableJoin.class)) {
+driver.close();

Review comment:
   Nice catch.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12748) Explore new RocksDB options to consider enabling by default

2021-05-04 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12748:
---
Description: 
With the rocksdb version bump comes a lot of new options, some of which look 
interesting enough to explore for usage in Streams. We should try setting these 
as default options and run the benchmarks to look for any performance benefit 
(or decrease). See javadocs for all Options 
[here|https://javadoc.io/doc/org.rocksdb/rocksdbjni/latest/org/rocksdb/Options.html]


Options.setAvoidUnnecessaryBlockingIO: 
- As the name suggest, avoids blocking/long-latency tasks by scheduling a 
background job to do it

Options.setSkipCheckingSstFileSizesOnDbOpen:
- Speeds up startup time if there are many sst files, could mean less 
overhead from things like rebalancing where tasks are migrated between clients 
or threads. Not sure how many sst files counts as "many", may be less useful 
now that we've disabled bulk loading 

 Options.setBestEffortsRecovery: 
- Interesting feature to allow recovering missing files without the use of 
the WAL. Could be useful if the on-disk state is corrupted (eg user deletes a 
file) without needing to rebuild state from scratch. Though I'd want to dig in 
further to understand what exactly it does and does not do. Not a performance 
improvement but we should run the benchmarks to make sure it doesn't make the 
performance worse.

Options.setWriteDbidToManifest:
- Should be set to true if/when we ever need to rely on the DB id eg for 
backups. Also not a performance improvement but we should still benchmark this.



Options.optimizeForSmallDb:
- This one is definitely not something we should set by default, as "small" 
here means under 1GB. But it's probably worth at least calling out in the docs 
for those users who know their data set size (per store) is under a GB

  was:
With the rocksdb version bump comes a lot of new options, some of which look 
interesting enough to explore for usage in Streams. We should try setting these 
as default options and run the benchmarks to look for any performance benefit 
(or decrease). See javadocs for all Options 
[here|https://javadoc.io/doc/org.rocksdb/rocksdbjni/latest/org/rocksdb/Options.html]


Options.setAvoidUnnecessaryBlockingIO: 
- As the name suggest, avoids blocking/long-latency tasks by scheduling a 
background job to do it

 Options.setBestEffortsRecovery: 
- Interesting feature to allow recovering missing files without the use of 
the WAL. Could be useful if the on-disk state is corrupted (eg user deletes a 
file) without needing to rebuild state from scratch. Though I'd want to dig in 
further to understand what exactly it does and does not do. Not a performance 
improvement but we should run the benchmarks to make sure it doesn't make the 
performance worse.

Options.setWriteDbidToManifest:
- Should be set to true if/when we ever need to rely on the DB id eg for 
backups. Also not a performance improvement but we should still benchmark this.


Options.optimizeForSmallDb:
- This one is definitely not something we should set by default, as "small" 
here means under 1GB. But it's probably worth at least calling out in the docs 
for those users who know their data set size (per store) is under a GB


> Explore new RocksDB options to consider enabling by default
> ---
>
> Key: KAFKA-12748
> URL: https://issues.apache.org/jira/browse/KAFKA-12748
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> With the rocksdb version bump comes a lot of new options, some of which look 
> interesting enough to explore for usage in Streams. We should try setting 
> these as default options and run the benchmarks to look for any performance 
> benefit (or decrease). See javadocs for all Options 
> [here|https://javadoc.io/doc/org.rocksdb/rocksdbjni/latest/org/rocksdb/Options.html]
> Options.setAvoidUnnecessaryBlockingIO: 
> - As the name suggest, avoids blocking/long-latency tasks by scheduling a 
> background job to do it
> Options.setSkipCheckingSstFileSizesOnDbOpen:
> - Speeds up startup time if there are many sst files, could mean less 
> overhead from things like rebalancing where tasks are migrated between 
> clients or threads. Not sure how many sst files counts as "many", may be less 
> useful now that we've disabled bulk loading 
>  Options.setBestEffortsRecovery: 
> - Interesting feature to allow recovering missing files without the use 
> of the WAL. Could be useful if the on-disk state is corrupted (eg user 
> deletes a file) without needing to rebuild state from scratch. Though I'd 
> want to dig in further to understand what exactly it does and does not do.

[GitHub] [kafka] lct45 opened a new pull request #10631: MINOR: Stop using hamcrest in system tests

2021-05-04 Thread GitBox


lct45 opened a new pull request #10631:
URL: https://github.com/apache/kafka/pull/10631


   We currently use `hamcrest` imports to check the outputs of the 
`RelationalSmokeTest`, but with the new gradle updates, the proper hamcrest 
imports are no longer included in the test jar.
   
   This is a bit of a workaround to remove the hamcrest usage so we can get 
system tests up and running again. Potential follow-up could be to update the 
way we create the test-jar to pull in the proper dependencies
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] lct45 commented on pull request #10631: MINOR: Stop using hamcrest in system tests

2021-05-04 Thread GitBox


lct45 commented on pull request #10631:
URL: https://github.com/apache/kafka/pull/10631#issuecomment-832310245


   @cadonna @vvcephei for review


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

2021-05-04 Thread GitBox


ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626056309



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStreamThread.java
##
@@ -411,13 +408,25 @@ private StateConsumer initialize() {
 
 return stateConsumer;
 } catch (final StreamsException fatalException) {
+closeStateConsumer(stateConsumer, false);

Review comment:
   Ah, so we were also possibly leaking stores in case of a 
StreamsException or other Exception? Yikes

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -396,6 +396,21 @@ public void close() {
 log.info("Skipping to close non-initialized store {}", 
entry.getKey());
 }
 }
+for (final StateStore store : globalStateStores) {

Review comment:
   Yikes, so it actually was a (possible) leak in production code? I'm 
almost glad rocksdb had a bug, otherwise we might have never caught it 😅 
   
   Definite +1 on consolidating these, can you file a quick ticket for this? It 
almost sounds like a bug in itself that one might contain stores that the other 
does not, but I suppose as long as they all get closed it's probably ok. 
   
   I'd bet you could find a community member to pick it up by labeling as 
`newbie` or `newbie++`

##
File path: gradle/dependencies.gradle
##
@@ -103,7 +103,7 @@ versions += [
   netty: "4.1.62.Final",
   powermock: "2.0.9",
   reflections: "0.9.12",
-  rocksDB: "5.18.4",
+  rocksDB: "6.16.4",

Review comment:
   🥳 

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -374,7 +374,7 @@ public void flush() {
 
 @Override
 public void close() {
-if (globalStores.isEmpty()) {
+if (globalStateStores.isEmpty() && globalStores.isEmpty()) {

Review comment:
   What even is the difference between `globalStateStores` and 
`globalStores`? I'm not surprised we had a leak, I'm pretty sure if I was 
looking at this code I wouldn't remember that these were actually two different 
variables. Maybe we can give them slightly more descriptive names in the 
meantime?

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -99,35 +104,7 @@ public Env getEnv() {
 
 @Override
 public Options prepareForBulkLoad() {
-/* From https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
- *
- * Q: What's the fastest way to load data into RocksDB?
- *
- * A: A fast way to direct insert data to the DB:
- *
- *  1. using single writer thread and insert in sorted order
- *  2. batch hundreds of keys into one write batch
- *  3. use vector memtable
- *  4. make sure options.max_background_flushes is at least 4
- *  5. before inserting the data,
- *   disable automatic compaction,
- *   set options.level0_file_num_compaction_trigger,
- *   options.level0_slowdown_writes_trigger
- *   and options.level0_stop_writes_trigger to very large.
- * After inserting all the data, issue a manual compaction.
- *
- * 3-5 will be automatically done if you call 
Options::PrepareForBulkLoad() to your option
- */
-// (1) not in our control
-// (2) is done via bulk-loading API
-// (3) skipping because, not done in actual PrepareForBulkLoad() code 
in https://github.com/facebook/rocksdb/blob/master/options/options.cc
-//columnFamilyOptions.setMemTableConfig(new VectorMemTableConfig());
-// (4-5) below:
-dbOptions.setMaxBackgroundFlushes(4);
-columnFamilyOptions.setDisableAutoCompactions(true);
-columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30);
-columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30);
-columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30);

Review comment:
   I seem to remember there was some reason we didn't remove this when we 
disabled bulk loading...but I don't remember what that was. @guozhangwang do 
you?
   
   Or were these options just removed in 6.x

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -1381,6 +1364,313 @@ public WriteBufferManager writeBufferManager() {
 return dbOptions.writeBufferManager();
 }
 
+@Override
+public Options setMaxWriteBatchGroupSizeBytes(final long 
maxWriteBatchGroupSizeBytes) {
+dbOptions.setMaxWriteBatchGroupSizeBytes(maxWriteBatchGroupSizeBytes);
+return this;
+}
+
+@Override
+public long maxWriteB

[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

2021-05-04 Thread GitBox


ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626171667



##
File path: gradle/dependencies.gradle
##
@@ -103,7 +103,7 @@ versions += [
   netty: "4.1.62.Final",
   powermock: "2.0.9",
   reflections: "0.9.12",
-  rocksDB: "5.18.4",
+  rocksDB: "6.16.4",

Review comment:
   Guess I celebrated too early...hope you find the remaining leak without 
much trouble (and that it is indeed a leak and not an unavoidable bug of some 
kind, though that seems very unlikely)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

2021-05-04 Thread GitBox


ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626175996



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -396,6 +396,21 @@ public void close() {
 log.info("Skipping to close non-initialized store {}", 
entry.getKey());
 }
 }
+for (final StateStore store : globalStateStores) {

Review comment:
   Heh, @guozhangwang and I reviewed at the same time. I didn't notice that 
`globalStores` would not be populated until restoration, whereas 
`globalStateStores` is populated in the constructor. Imo we should just 
populate `globalStores` in the constructor as well, but I guess that won't be 
necessary if @guozhangwang does a quick followup to consolidate them




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

2021-05-04 Thread GitBox


ableegoldman commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626176038



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/GlobalStateManagerImpl.java
##
@@ -396,6 +396,21 @@ public void close() {
 log.info("Skipping to close non-initialized store {}", 
entry.getKey());
 }
 }
+for (final StateStore store : globalStateStores) {

Review comment:
   Heh, @guozhangwang and I reviewed at the same time. I didn't notice that 
`globalStores` would not be populated until restoration, whereas 
`globalStateStores` is populated in the constructor. Imo we should just 
populate `globalStores` in the constructor as well, but I guess that won't be 
necessary if @guozhangwang does a quick followup to consolidate them




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-12749) Changelog topic config on suppressed KTable lost

2021-05-04 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12749:
---
Labels: newbie newbie++  (was: )

> Changelog topic config on suppressed KTable lost
> 
>
> Key: KAFKA-12749
> URL: https://issues.apache.org/jira/browse/KAFKA-12749
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.7.0, 2.6.1, 2.8.0, 2.6.2
>Reporter: Philip Bourke
>Priority: Minor
>  Labels: newbie, newbie++
>
> When trying to set the changelog configuration on a suppressed KTable, the 
> config is lost if either {{emitEarlyWhenFull}} or {{shutDownWhenFull}} is set 
> after the logging config.
> This works - 
> {code:java}
> .suppress(Suppressed.untilTimeLimit(Duration.ofMillis(maxIdleIntervalMs), 
> BufferConfig.maxRecords(
>  
> maxBufferRecords).emitEarlyWhenFull().withLoggingEnabled(changelogConfig)){code}
> but not if you set {{emitEarlyWhenFull}} last.
> See comments in https://issues.apache.org/jira/browse/KAFKA-8147
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12749) Changelog topic config on suppressed KTable lost

2021-05-04 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12749:
---
Fix Version/s: 2.8.1

> Changelog topic config on suppressed KTable lost
> 
>
> Key: KAFKA-12749
> URL: https://issues.apache.org/jira/browse/KAFKA-12749
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.7.0, 2.8.0
>Reporter: Philip Bourke
>Priority: Minor
>  Labels: newbie, newbie++
> Fix For: 3.0.0, 2.8.1
>
>
> When trying to set the changelog configuration on a suppressed KTable, the 
> config is lost if either {{emitEarlyWhenFull}} or {{shutDownWhenFull}} is set 
> after the logging config.
> This works - 
> {code:java}
> .suppress(Suppressed.untilTimeLimit(Duration.ofMillis(maxIdleIntervalMs), 
> BufferConfig.maxRecords(
>  
> maxBufferRecords).emitEarlyWhenFull().withLoggingEnabled(changelogConfig)){code}
> but not if you set {{emitEarlyWhenFull}} last.
> See comments in https://issues.apache.org/jira/browse/KAFKA-8147
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12749) Changelog topic config on suppressed KTable lost

2021-05-04 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12749:
---
Fix Version/s: 3.0.0

> Changelog topic config on suppressed KTable lost
> 
>
> Key: KAFKA-12749
> URL: https://issues.apache.org/jira/browse/KAFKA-12749
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.7.0, 2.8.0
>Reporter: Philip Bourke
>Priority: Minor
>  Labels: newbie, newbie++
> Fix For: 3.0.0
>
>
> When trying to set the changelog configuration on a suppressed KTable, the 
> config is lost if either {{emitEarlyWhenFull}} or {{shutDownWhenFull}} is set 
> after the logging config.
> This works - 
> {code:java}
> .suppress(Suppressed.untilTimeLimit(Duration.ofMillis(maxIdleIntervalMs), 
> BufferConfig.maxRecords(
>  
> maxBufferRecords).emitEarlyWhenFull().withLoggingEnabled(changelogConfig)){code}
> but not if you set {{emitEarlyWhenFull}} last.
> See comments in https://issues.apache.org/jira/browse/KAFKA-8147
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12749) Changelog topic config on suppressed KTable lost

2021-05-04 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12749?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17339355#comment-17339355
 ] 

A. Sophie Blee-Goldman commented on KAFKA-12749:


Thanks for filing a ticket – we should try to get this fixed up in 3.0 since 
it's kind of awkward to just silently drop the user's specified configurations, 
we're lucky that you happened to notice this at all. Just copying over our 
observation of the underlying problem:
{quote}Seems like BufferConfigInternal#emitEarlyWhenFull creates a new 
EagerBufferConfigImpl and passes the two original configs (maxRecords and 
maxBytes) in to the constructor, but loses the logging configs at that point. 
Same thing for BufferConfigInternal#shutDownWhenFull 
{quote}
I think we can do a trivial, one-line fix to just pass the logging configs as a 
parameter to the constructor for an immediate patch in 3.0 (or two lines, one 
for the EagerBufferConfigImpl and one for the StrictBufferConfigImpl 
constructed in BufferConfigInternal). And maybe also remove the constructor 
that doesn't accept a logConfig parameter so you're forced to specify it 
explicitly, whether it's empty/unspecified or not.

Eventually we probably want to refactor things a bit for a more natural fluent 
API, and try to future-proof things a bit so we don't accidentally introduce 
bugs like this again. But that's not as urgent

> Changelog topic config on suppressed KTable lost
> 
>
> Key: KAFKA-12749
> URL: https://issues.apache.org/jira/browse/KAFKA-12749
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.7.0, 2.6.1, 2.8.0, 2.6.2
>Reporter: Philip Bourke
>Priority: Minor
>  Labels: newbie, newbie++
>
> When trying to set the changelog configuration on a suppressed KTable, the 
> config is lost if either {{emitEarlyWhenFull}} or {{shutDownWhenFull}} is set 
> after the logging config.
> This works - 
> {code:java}
> .suppress(Suppressed.untilTimeLimit(Duration.ofMillis(maxIdleIntervalMs), 
> BufferConfig.maxRecords(
>  
> maxBufferRecords).emitEarlyWhenFull().withLoggingEnabled(changelogConfig)){code}
> but not if you set {{emitEarlyWhenFull}} last.
> See comments in https://issues.apache.org/jira/browse/KAFKA-8147
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12749) Changelog topic config on suppressed KTable lost

2021-05-04 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12749?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-12749:
---
Affects Version/s: (was: 2.6.2)
   (was: 2.6.1)

> Changelog topic config on suppressed KTable lost
> 
>
> Key: KAFKA-12749
> URL: https://issues.apache.org/jira/browse/KAFKA-12749
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0, 2.7.0, 2.8.0
>Reporter: Philip Bourke
>Priority: Minor
>  Labels: newbie, newbie++
>
> When trying to set the changelog configuration on a suppressed KTable, the 
> config is lost if either {{emitEarlyWhenFull}} or {{shutDownWhenFull}} is set 
> after the logging config.
> This works - 
> {code:java}
> .suppress(Suppressed.untilTimeLimit(Duration.ofMillis(maxIdleIntervalMs), 
> BufferConfig.maxRecords(
>  
> maxBufferRecords).emitEarlyWhenFull().withLoggingEnabled(changelogConfig)){code}
> but not if you set {{emitEarlyWhenFull}} last.
> See comments in https://issues.apache.org/jira/browse/KAFKA-8147
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] showuon commented on pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-04 Thread GitBox


showuon commented on pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#issuecomment-832356675


   Ah! thanks for the jenkins catching this! We should 
`verifyValidityAndBalance` before verifying assignment for cooperative protocol 
to have additional rebalance if necessary. Updated. Thanks.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] showuon commented on pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-04 Thread GitBox


showuon commented on pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#issuecomment-832388860


   Failed test is flaky and un-related:
   ```
   tests/Build/JDK 15 and Scala 2.13 passed
   tests/Build/JDK 8 and Scala 2.12 passed
   Build / JDK 11 and Scala 2.13 / 
org.apache.kafka.streams.integration.KTableKTableForeignKeyInnerJoinMultiIntegrationTest.shouldInnerJoinMultiPartitionQueryable
   ```


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on pull request #10509: KAFKA-12464: enhance constrained sticky Assign algorithm

2021-05-04 Thread GitBox


guozhangwang commented on pull request #10509:
URL: https://github.com/apache/kafka/pull/10509#issuecomment-832406252


   LGTM. Leaving to @ableegoldman for a final look.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-12748) Explore new RocksDB options to consider enabling by default

2021-05-04 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12748?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17339399#comment-17339399
 ] 

Guozhang Wang commented on KAFKA-12748:
---

Thanks for making a pass on the new options! This is a good list.

> Explore new RocksDB options to consider enabling by default
> ---
>
> Key: KAFKA-12748
> URL: https://issues.apache.org/jira/browse/KAFKA-12748
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> With the rocksdb version bump comes a lot of new options, some of which look 
> interesting enough to explore for usage in Streams. We should try setting 
> these as default options and run the benchmarks to look for any performance 
> benefit (or decrease). See javadocs for all Options 
> [here|https://javadoc.io/doc/org.rocksdb/rocksdbjni/latest/org/rocksdb/Options.html]
> Options.setAvoidUnnecessaryBlockingIO: 
> - As the name suggest, avoids blocking/long-latency tasks by scheduling a 
> background job to do it
> Options.setSkipCheckingSstFileSizesOnDbOpen:
> - Speeds up startup time if there are many sst files, could mean less 
> overhead from things like rebalancing where tasks are migrated between 
> clients or threads. Not sure how many sst files counts as "many", may be less 
> useful now that we've disabled bulk loading 
>  Options.setBestEffortsRecovery: 
> - Interesting feature to allow recovering missing files without the use 
> of the WAL. Could be useful if the on-disk state is corrupted (eg user 
> deletes a file) without needing to rebuild state from scratch. Though I'd 
> want to dig in further to understand what exactly it does and does not do. 
> Not a performance improvement but we should run the benchmarks to make sure 
> it doesn't make the performance worse.
> Options.setWriteDbidToManifest:
> - Should be set to true if/when we ever need to rely on the DB id eg for 
> backups. Also not a performance improvement but we should still benchmark 
> this.
> Options.optimizeForSmallDb:
> - This one is definitely not something we should set by default, as 
> "small" here means under 1GB. But it's probably worth at least calling out in 
> the docs for those users who know their data set size (per store) is under a 
> GB



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-12748) Explore new RocksDB options to consider enabling by default

2021-05-04 Thread Guozhang Wang (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12748?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Guozhang Wang updated KAFKA-12748:
--
Labels: rocksdb  (was: )

> Explore new RocksDB options to consider enabling by default
> ---
>
> Key: KAFKA-12748
> URL: https://issues.apache.org/jira/browse/KAFKA-12748
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: rocksdb
>
> With the rocksdb version bump comes a lot of new options, some of which look 
> interesting enough to explore for usage in Streams. We should try setting 
> these as default options and run the benchmarks to look for any performance 
> benefit (or decrease). See javadocs for all Options 
> [here|https://javadoc.io/doc/org.rocksdb/rocksdbjni/latest/org/rocksdb/Options.html]
> Options.setAvoidUnnecessaryBlockingIO: 
> - As the name suggest, avoids blocking/long-latency tasks by scheduling a 
> background job to do it
> Options.setSkipCheckingSstFileSizesOnDbOpen:
> - Speeds up startup time if there are many sst files, could mean less 
> overhead from things like rebalancing where tasks are migrated between 
> clients or threads. Not sure how many sst files counts as "many", may be less 
> useful now that we've disabled bulk loading 
>  Options.setBestEffortsRecovery: 
> - Interesting feature to allow recovering missing files without the use 
> of the WAL. Could be useful if the on-disk state is corrupted (eg user 
> deletes a file) without needing to rebuild state from scratch. Though I'd 
> want to dig in further to understand what exactly it does and does not do. 
> Not a performance improvement but we should run the benchmarks to make sure 
> it doesn't make the performance worse.
> Options.setWriteDbidToManifest:
> - Should be set to true if/when we ever need to rely on the DB id eg for 
> backups. Also not a performance improvement but we should still benchmark 
> this.
> Options.optimizeForSmallDb:
> - This one is definitely not something we should set by default, as 
> "small" here means under 1GB. But it's probably worth at least calling out in 
> the docs for those users who know their data set size (per store) is under a 
> GB



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

2021-05-04 Thread GitBox


guozhangwang commented on a change in pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#discussion_r626249438



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBGenericOptionsToDbOptionsColumnFamilyOptionsAdapter.java
##
@@ -99,35 +104,7 @@ public Env getEnv() {
 
 @Override
 public Options prepareForBulkLoad() {
-/* From https://github.com/facebook/rocksdb/wiki/RocksDB-FAQ
- *
- * Q: What's the fastest way to load data into RocksDB?
- *
- * A: A fast way to direct insert data to the DB:
- *
- *  1. using single writer thread and insert in sorted order
- *  2. batch hundreds of keys into one write batch
- *  3. use vector memtable
- *  4. make sure options.max_background_flushes is at least 4
- *  5. before inserting the data,
- *   disable automatic compaction,
- *   set options.level0_file_num_compaction_trigger,
- *   options.level0_slowdown_writes_trigger
- *   and options.level0_stop_writes_trigger to very large.
- * After inserting all the data, issue a manual compaction.
- *
- * 3-5 will be automatically done if you call 
Options::PrepareForBulkLoad() to your option
- */
-// (1) not in our control
-// (2) is done via bulk-loading API
-// (3) skipping because, not done in actual PrepareForBulkLoad() code 
in https://github.com/facebook/rocksdb/blob/master/options/options.cc
-//columnFamilyOptions.setMemTableConfig(new VectorMemTableConfig());
-// (4-5) below:
-dbOptions.setMaxBackgroundFlushes(4);
-columnFamilyOptions.setDisableAutoCompactions(true);
-columnFamilyOptions.setLevel0FileNumCompactionTrigger(1 << 30);
-columnFamilyOptions.setLevel0SlowdownWritesTrigger(1 << 30);
-columnFamilyOptions.setLevel0StopWritesTrigger(1 << 30);

Review comment:
   I left a comment on the old PR as well --- I vaguely remember that in 
new versions this is done inside rocksDB hence we do not need it, but maybe 
@cadonna can confirm here. And if yes, we could add a one line comment for 
future readers?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #9441: KAFKA-10614: Ensure group state (un)load is executed in the submitted order

2021-05-04 Thread GitBox


guozhangwang commented on a change in pull request #9441:
URL: https://github.com/apache/kafka/pull/9441#discussion_r626250346



##
File path: core/src/main/scala/kafka/server/KafkaApis.scala
##
@@ -279,30 +279,33 @@ class KafkaApis(val requestChannel: RequestChannel,
 new 
StopReplicaResponseData().setErrorCode(Errors.STALE_BROKER_EPOCH.code)))
 } else {
   val partitionStates = stopReplicaRequest.partitionStates().asScala
-  val (result, error) = replicaManager.stopReplicas(
-request.context.correlationId,
-stopReplicaRequest.controllerId,
-stopReplicaRequest.controllerEpoch,
-stopReplicaRequest.brokerEpoch,
-partitionStates)
-  // Clear the coordinator caches in case we were the leader. In the case 
of a reassignment, we
-  // cannot rely on the LeaderAndIsr API for this since it is only sent to 
active replicas.
-  result.forKeyValue { (topicPartition, error) =>
-if (error == Errors.NONE) {
-  if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
-  && partitionStates(topicPartition).deletePartition) {
-groupCoordinator.onResignation(topicPartition.partition)
-  } else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
- && partitionStates(topicPartition).deletePartition) {
+  def onStopReplicas(error: Errors, partitions: Map[TopicPartition, 
Errors]): Unit = {
+// Clear the coordinator caches in case we were the leader. In the 
case of a reassignment, we
+// cannot rely on the LeaderAndIsr API for this since it is only sent 
to active replicas.
+partitions.forKeyValue { (topicPartition, partitionError) =>
+  if (partitionError == Errors.NONE) {
 val partitionState = partitionStates(topicPartition)
 val leaderEpoch = if (partitionState.leaderEpoch >= 0)
-Some(partitionState.leaderEpoch)
+  Some(partitionState.leaderEpoch)
 else
   None
-txnCoordinator.onResignation(topicPartition.partition, 
coordinatorEpoch = leaderEpoch)
+if (topicPartition.topic == GROUP_METADATA_TOPIC_NAME
+  && partitionState.deletePartition) {
+  groupCoordinator.onResignation(topicPartition.partition, 
leaderEpoch)
+} else if (topicPartition.topic == TRANSACTION_STATE_TOPIC_NAME
+  && partitionState.deletePartition) {
+  txnCoordinator.onResignation(topicPartition.partition, 
coordinatorEpoch = leaderEpoch)
+}
   }
 }
   }
+  val (result, error) = replicaManager.stopReplicas(
+request.context.correlationId,
+stopReplicaRequest.controllerId,
+stopReplicaRequest.controllerEpoch,
+stopReplicaRequest.brokerEpoch,
+partitionStates,
+onStopReplicas)

Review comment:
   I think `partitionLock` and `replicaStateChangeLock` are for different 
purposes here: the latter is specifically for changing the replica state 
including leader, ISR, while the former is for more general access patterns? 
@hachikuji could you chime in here if you got some time.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied

2021-05-04 Thread GitBox


guozhangwang commented on a change in pull request #10613:
URL: https://github.com/apache/kafka/pull/10613#discussion_r626250519



##
File path: 
streams/src/main/java/org/apache/kafka/streams/kstream/internals/KStreamImplJoin.java
##
@@ -155,7 +156,7 @@ public long get() {
 
 final String outerJoinStoreName = userProvidedBaseStoreName == 
null ? outerJoinStoreGeneratedName : userProvidedBaseStoreName + 
outerJoinSuffix;
 
-outerJoinWindowStore = 
Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, 
streamJoinedInternal));
+outerJoinWindowStore = 
Optional.of(sharedOuterJoinWindowStoreBuilder(outerJoinStoreName, windows, 
streamJoinedInternal, persistent));

Review comment:
   Maybe keeping it simple to always go with left is fine for now, I was 
just wondering if you have a specific reason for that :)




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] guozhangwang commented on a change in pull request #10613: KAFKA-10847: Set shared outer store to an in-memory store when in-memory stores are supplied

2021-05-04 Thread GitBox


guozhangwang commented on a change in pull request #10613:
URL: https://github.com/apache/kafka/pull/10613#discussion_r626253159



##
File path: streams/src/test/java/org/apache/kafka/streams/TopologyTest.java
##
@@ -863,6 +920,60 @@ public void 
streamStreamLeftJoinTopologyWithCustomStoresNames() {
 describe.toString());
 }
 
+@Test
+public void streamStreamLeftJoinTopologyWithCustomStoresSuppliers() {
+final StreamsBuilder builder  = new StreamsBuilder();
+final KStream stream1;
+final KStream stream2;
+
+stream1 = builder.stream("input-topic1");
+stream2 = builder.stream("input-topic2");
+
+final JoinWindows joinWindows = JoinWindows.of(ofMillis(100));
+
+final WindowBytesStoreSupplier thisStoreSupplier = 
Stores.inMemoryWindowStore("in-memory-join-store",
+Duration.ofMillis(joinWindows.size() + 
joinWindows.gracePeriodMs()),
+Duration.ofMillis(joinWindows.size()), true);
+
+final WindowBytesStoreSupplier otherStoreSupplier = 
Stores.inMemoryWindowStore("in-memory-join-store-other",
+Duration.ofMillis(joinWindows.size() + 
joinWindows.gracePeriodMs()),
+Duration.ofMillis(joinWindows.size()), true);
+
+stream1.leftJoin(
+stream2,
+MockValueJoiner.TOSTRING_JOINER,
+joinWindows,
+StreamJoined.with(Serdes.Integer(), Serdes.String(), 
Serdes.String())
+.withThisStoreSupplier(thisStoreSupplier)
+.withOtherStoreSupplier(otherStoreSupplier));
+
+final TopologyDescription describe = builder.build().describe();
+
+assertEquals(
+"Topologies:\n" +
+"   Sub-topology: 0\n" +
+"Source: KSTREAM-SOURCE-00 (topics: 
[input-topic1])\n" +
+"  --> KSTREAM-WINDOWED-02\n" +
+"Source: KSTREAM-SOURCE-01 (topics: 
[input-topic2])\n" +
+"  --> KSTREAM-WINDOWED-03\n" +
+"Processor: KSTREAM-WINDOWED-02 (stores: 
[in-memory-join-store])\n" +
+"  --> KSTREAM-JOINTHIS-04\n" +
+"  <-- KSTREAM-SOURCE-00\n" +
+"Processor: KSTREAM-WINDOWED-03 (stores: 
[in-memory-join-store-other])\n" +
+"  --> KSTREAM-OUTEROTHER-05\n" +
+"  <-- KSTREAM-SOURCE-01\n" +
+"Processor: KSTREAM-JOINTHIS-04 (stores: 
[in-memory-join-store-other, KSTREAM-OUTERSHARED-04-memory-store])\n" +

Review comment:
   Maybe we can also make it a bit simpler to just rely on the left side: 
if the supplier is given on the left side (which would always provide the 
name), then we name the shared as `thisStoreSupplier.name() + 
"outerJoinSuffix"`; if it is not, then we look into 
`streamJoinedInternal.storeName()`, and the last resort as 
`outerJoinStoreGeneratedName`.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on pull request #10568: KAFKA-8897: Upgrade RocksDB to 6.16.4

2021-05-04 Thread GitBox


cadonna commented on pull request #10568:
URL: https://github.com/apache/kafka/pull/10568#issuecomment-832443804


   Apparently there are still leaks since we get `pure virtual method called` 
in the JDK 8 builds. I did not get them locally. I will investigate further.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] kowshik commented on a change in pull request #10271: KAFKA-12429: Added serdes for the default implementation of RLMM based on an internal topic as storage.

2021-05-04 Thread GitBox


kowshik commented on a change in pull request #10271:
URL: https://github.com/apache/kafka/pull/10271#discussion_r626300076



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/serialization/RemoteLogSegmentMetadataTransform.java
##
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage.serialization;
+
+import org.apache.kafka.common.TopicIdPartition;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.metadata.ApiMessageAndVersion;
+import 
org.apache.kafka.server.log.remote.metadata.storage.generated.RemoteLogSegmentMetadataRecord;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentId;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadata;
+import 
org.apache.kafka.server.log.remote.storage.RemoteLogSegmentMetadataUpdate;
+import org.apache.kafka.server.log.remote.storage.RemoteLogSegmentState;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class RemoteLogSegmentMetadataTransform implements 
RemoteLogMetadataTransform {
+
+public ApiMessageAndVersion 
toApiMessageAndVersion(RemoteLogSegmentMetadata segmentMetadata) {
+RemoteLogSegmentMetadataRecord record = new 
RemoteLogSegmentMetadataRecord()
+
.setRemoteLogSegmentId(createRemoteLogSegmentIdEntry(segmentMetadata))
+.setStartOffset(segmentMetadata.startOffset())
+.setEndOffset(segmentMetadata.endOffset())
+.setBrokerId(segmentMetadata.brokerId())
+.setEventTimestampMs(segmentMetadata.eventTimestampMs())
+.setMaxTimestampMs(segmentMetadata.maxTimestampMs())
+.setSegmentSizeInBytes(segmentMetadata.segmentSizeInBytes())
+
.setSegmentLeaderEpochs(createSegmentLeaderEpochsEntry(segmentMetadata))
+.setRemoteLogSegmentState(segmentMetadata.state().id());
+
+return new ApiMessageAndVersion(record, 
record.highestSupportedVersion());
+}
+
+private List 
createSegmentLeaderEpochsEntry(RemoteLogSegmentMetadata data) {
+return data.segmentLeaderEpochs().entrySet().stream()
+   .map(entry -> new 
RemoteLogSegmentMetadataRecord.SegmentLeaderEpochEntry()
+   .setLeaderEpoch(entry.getKey())
+   .setOffset(entry.getValue()))
+   .collect(Collectors.toList());
+}
+
+private RemoteLogSegmentMetadataRecord.RemoteLogSegmentIdEntry 
createRemoteLogSegmentIdEntry(RemoteLogSegmentMetadata data) {
+return new RemoteLogSegmentMetadataRecord.RemoteLogSegmentIdEntry()
+.setTopicIdPartition(
+new 
RemoteLogSegmentMetadataRecord.TopicIdPartitionEntry()
+
.setId(data.remoteLogSegmentId().topicIdPartition().topicId())
+
.setName(data.remoteLogSegmentId().topicIdPartition().topicPartition().topic())
+
.setPartition(data.remoteLogSegmentId().topicIdPartition().topicPartition().partition()))
+.setId(data.remoteLogSegmentId().id());
+}
+
+@Override
+public RemoteLogSegmentMetadata 
fromApiMessageAndVersion(ApiMessageAndVersion apiMessageAndVersion) {
+RemoteLogSegmentMetadataRecord record = 
(RemoteLogSegmentMetadataRecord) apiMessageAndVersion.message();
+RemoteLogSegmentId remoteLogSegmentId = 
buildRemoteLogSegmentId(record.remoteLogSegmentId());
+
+Map segmentLeaderEpochs = new HashMap<>();
+for (RemoteLogSegmentMetadataRecord.SegmentLeaderEpochEntry 
segmentLeaderEpoch : record.segmentLeaderEpochs()) {
+segmentLeaderEpochs.put(segmentLeaderEpoch.leaderEpoch(), 
segmentLeaderEpoch.offset());
+}
+
+RemoteLogSegmentMetadata remoteLogSegmentMetadata =
+new RemoteLogSegmentMetadata(remoteLogSegmentId, 
record.startOffset(), record.endOffset(),
+ record.maxTimestampMs(), 
record.brokerId(),
+