[jira] [Commented] (KAFKA-7304) memory leakage in org.apache.kafka.common.network.Selector

2020-07-07 Thread Stanislav Kozlovski (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7304?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152523#comment-17152523
 ] 

Stanislav Kozlovski commented on KAFKA-7304:


Given that this Jira is for reporting memory issues and those were fixed, does 
it make sense to either close this or explicitly rename/redo the description to 
mention it's tracking high CPU with SSL?

cc [~rsivaram] [~yuyang08]

> memory leakage in org.apache.kafka.common.network.Selector
> --
>
> Key: KAFKA-7304
> URL: https://issues.apache.org/jira/browse/KAFKA-7304
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.1.0, 1.1.1
>Reporter: Yu Yang
>Priority: Major
> Attachments: 7304.v4.txt, 7304.v7.txt, Screen Shot 2018-08-16 at 
> 11.04.16 PM.png, Screen Shot 2018-08-16 at 11.06.38 PM.png, Screen Shot 
> 2018-08-16 at 12.41.26 PM.png, Screen Shot 2018-08-16 at 4.26.19 PM.png, 
> Screen Shot 2018-08-17 at 1.03.35 AM.png, Screen Shot 2018-08-17 at 1.04.32 
> AM.png, Screen Shot 2018-08-17 at 1.05.30 AM.png, Screen Shot 2018-08-28 at 
> 11.09.45 AM.png, Screen Shot 2018-08-29 at 10.49.03 AM.png, Screen Shot 
> 2018-08-29 at 10.50.47 AM.png, Screen Shot 2018-09-29 at 10.38.12 PM.png, 
> Screen Shot 2018-09-29 at 10.38.38 PM.png, Screen Shot 2018-09-29 at 8.34.50 
> PM.png
>
>
> We are testing secured writing to kafka through ssl. Testing at small scale, 
> ssl writing to kafka was fine. However, when we enabled ssl writing at a 
> larger scale (>40k clients write concurrently), the kafka brokers soon hit 
> OutOfMemory issue with 4G memory setting. We have tried with increasing the 
> heap size to 10Gb, but encountered the same issue. 
> We took a few heap dumps , and found that most of the heap memory is 
> referenced through org.apache.kafka.common.network.Selector objects.  There 
> are two Channel maps field in Selector. It seems that somehow the objects is 
> not deleted from the map in a timely manner. 
> One observation is that the memory leak seems relate to kafka partition 
> leader changes. If there is broker restart etc. in the cluster that caused 
> partition leadership change, the brokers may hit the OOM issue faster. 
> {code}
> private final Map channels;
> private final Map closingChannels;
> {code}
> Please see the  attached images and the following link for sample gc 
> analysis. 
> http://gceasy.io/my-gc-report.jsp?p=c2hhcmVkLzIwMTgvMDgvMTcvLS1nYy5sb2cuMC5jdXJyZW50Lmd6LS0yLTM5LTM0
> the command line for running kafka: 
> {code}
> java -Xms10g -Xmx10g -XX:NewSize=512m -XX:MaxNewSize=512m 
> -Xbootclasspath/p:/usr/local/libs/bcp -XX:MetaspaceSize=128m -XX:+UseG1GC 
> -XX:MaxGCPauseMillis=25 -XX:InitiatingHeapOccupancyPercent=35 
> -XX:G1HeapRegionSize=16M -XX:MinMetaspaceFreeRatio=25 
> -XX:MaxMetaspaceFreeRatio=75 -XX:+PrintGCDetails -XX:+PrintGCDateStamps 
> -XX:+PrintTenuringDistribution -Xloggc:/var/log/kafka/gc.log 
> -XX:+UseGCLogFileRotation -XX:NumberOfGCLogFiles=40 -XX:GCLogFileSize=50M 
> -Djava.awt.headless=true 
> -Dlog4j.configuration=file:/etc/kafka/log4j.properties 
> -Dcom.sun.management.jmxremote 
> -Dcom.sun.management.jmxremote.authenticate=false 
> -Dcom.sun.management.jmxremote.ssl=false 
> -Dcom.sun.management.jmxremote.port= 
> -Dcom.sun.management.jmxremote.rmi.port= -cp /usr/local/libs/*  
> kafka.Kafka /etc/kafka/server.properties
> {code}
> We use java 1.8.0_102, and has applied a TLS patch on reducing 
> X509Factory.certCache map size from 750 to 20. 
> {code}
> java -version
> java version "1.8.0_102"
> Java(TM) SE Runtime Environment (build 1.8.0_102-b14)
> Java HotSpot(TM) 64-Bit Server VM (build 25.102-b14, mixed mode)
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dajac commented on pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-07-07 Thread GitBox


dajac commented on pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#issuecomment-654671084


   Investigating this.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #8683: KAFKA-9893: Configurable TCP connection timeout and improve the initial metadata fetch

2020-07-07 Thread GitBox


rajinisivaram commented on a change in pull request #8683:
URL: https://github.com/apache/kafka/pull/8683#discussion_r450706026



##
File path: clients/src/main/java/org/apache/kafka/clients/NetworkClient.java
##
@@ -786,6 +809,29 @@ private void handleAbortedSends(List 
responses) {
 abortedSends.clear();
 }
 
+/**
+ * Handle socket channel connection timeout. The timeout will hit iff a 
connection
+ * stays at the ConnectionState.CONNECTING state longer than the timeout 
value,
+ * as indicated by ClusterConnectionStates.NodeConnectionState.
+ *
+ * @param responses The list of responses to update
+ * @param now The current time
+ */
+private void handleTimedOutConnections(List responses, 
long now) {
+Set connectingNodes = connectionStates.connectingNodes();
+for (String nodeId : connectingNodes) {
+if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
+this.selector.close(nodeId);
+log.debug(
+"Disconnecting from node {} due to socket connection setup 
timeout. " +
+"The timeout value is {} ms.",
+nodeId,
+connectionStates.connectionSetupTimeoutMs(nodeId));
+processDisconnection(responses, nodeId, now, 
ChannelState.LOCAL_CLOSE);

Review comment:
   @guozhangwang Thanks for reporting the exception in this code.
   @d8tltanc @dajac This code segment is unsafe, we are removing `node` from 
`connectingNodes` in p`processDisconnection()` while iterating over the set. We 
must be missing a test too (or we have a test with only one connection).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10243) ConcurrentModificationException while processing connection setup timeouts

2020-07-07 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-10243:
--

 Summary: ConcurrentModificationException while processing 
connection setup timeouts
 Key: KAFKA-10243
 URL: https://issues.apache.org/jira/browse/KAFKA-10243
 Project: Kafka
  Issue Type: Bug
  Components: network
Reporter: Rajini Sivaram
Assignee: David Jacot
 Fix For: 2.7


>From [~guozhang] in [https://github.com/apache/kafka/pull/8683:]

{quote}
java.util.ConcurrentModificationException
at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
at java.util.HashMap$KeyIterator.next(HashMap.java:1469)
at 
org.apache.kafka.clients.NetworkClient.handleTimedOutConnections(NetworkClient.java:822)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:574)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419)
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
{quote}

While processing connection set up timeouts, we are iterating through the 
connecting nodes to process timeouts and we disconnect within the loop, 
removing the entry from the set in the loop that it iterating over the set:

{code}
for (String nodeId : connectingNodes) {
if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
this.selector.close(nodeId);
log.debug(
"Disconnecting from node {} due to socket connection setup 
timeout. " +
"The timeout value is {} ms.",
nodeId,
connectionStates.connectionSetupTimeoutMs(nodeId));
processDisconnection(responses, nodeId, now, 
ChannelState.LOCAL_CLOSE);
}
}
{code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] omkreddy commented on pull request #8885: KAFKA-8264: decrease the record size for flaky test

2020-07-07 Thread GitBox


omkreddy commented on pull request #8885:
URL: https://github.com/apache/kafka/pull/8885#issuecomment-654729182


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #8979: KAFKA-10223; Make ReplicaNotAvailableException retriable metadata exception

2020-07-07 Thread GitBox


rajinisivaram commented on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-654729231


   @ijuma @hachikuji  Thank you, I guess we need a KIP anyway even for renaming 
`NOT_LEADER_FOR_PARTITION`. We can discuss the options during KIP discussion. 
My only concern about renaming is that every one knows 
`NOT_LEADER_FOR_PARTITION` since it has been there forever, it may take a while 
to get used to a different name. Perhaps not an issue.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #8150: KAFKA-9587: Producer configs are omitted in the documentation

2020-07-07 Thread GitBox


dongjinleekr commented on pull request #8150:
URL: https://github.com/apache/kafka/pull/8150#issuecomment-654736525


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr edited a comment on pull request #8150: KAFKA-9587: Producer configs are omitted in the documentation

2020-07-07 Thread GitBox


dongjinleekr edited a comment on pull request #8150:
URL: https://github.com/apache/kafka/pull/8150#issuecomment-649403477


   Rebased onto the latest trunk and I reviewed the problem more carefully and 
here is the summary:
   
   1. As @kkonstantine stated, [the default value of `retries` config is 
`Integer.MAX_VALUE`](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/producer/ProducerConfig.java#L289);
 however, the documentation still states that **'though since we have specified 
retries as 0 it won't.'** It seems like `props.put("retries", 0);` 
was removed mistakenly from example code snippet in commit c3e7c0b (2.1.0).
   
   2. The default value of `linger.ms` config is 0; however, the current 
documentation states that **we set our linger time to 1 millisecond**. 
`props.put("linger.ms", 1);` was removed from example code snippet in commit 
975b680 (2.2.0), for the example code of that times violates 
`delivery.timeout.ms` (3) >= `linger.ms` (1) + `request.timeout.ms` (3 
by default). However, it should not have be removed since removing 
`delivery.timeout.ms` only was enough - `delivery.timeout.ms` defaults to 
12. In conclusion, it seems like `props.put("linger.ms", 1);` was also 
removed mistakenly.
   
   cc for the reviewers of commit c3e7c0b and 975b680. @ijuma @guozhangwang 
@hachikuji
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac opened a new pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

2020-07-07 Thread GitBox


dajac opened a new pull request #8990:
URL: https://github.com/apache/kafka/pull/8990


   This PR fixes a bug introduced in https://github.com/apache/kafka/pull/8683/.
   
   While processing connection set up timeouts, we are iterating through the 
connecting nodes to process timeouts and we disconnect within the loop, 
removing the entry from the set in the loop that it iterating over the set. 
That raises an `ConcurrentModificationException` exception. The current unit 
test did not catch this because it was using only one node.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

2020-07-07 Thread GitBox


rajinisivaram commented on pull request #8990:
URL: https://github.com/apache/kafka/pull/8990#issuecomment-654748250


   ok to test



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on a change in pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

2020-07-07 Thread GitBox


rajinisivaram commented on a change in pull request #8990:
URL: https://github.com/apache/kafka/pull/8990#discussion_r450752852



##
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##
@@ -365,4 +367,48 @@ public void testExponentialConnectionSetupTimeout() {
 connectionSetupTimeoutMs * connectionSetupTimeoutJitter);
 assertTrue(connectionStates.connectingNodes().contains(nodeId1));
 }
+
+@Test
+public void testTimedOutConnections() {
+// Initiate two connections
+connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+
+// Expect no timed out connections
+
assertTrue(connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).isEmpty());

Review comment:
   nit: `assertEquals(0, ...)` may be better here so we know how many in 
the case of failure?

##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -440,6 +441,20 @@ public boolean isConnectionSetupTimeout(String id, long 
now) {
 return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);
 }
 
+/**
+ * Return the Set of nodes whose connection setup has timed out.
+ * @param now the current time in ms
+ */
+public Set nodesWithConnectionSetupTimeout(long now) {
+Set nodes = new HashSet<>();
+for (String nodeId : connectingNodes) {

Review comment:
   We can use `connectingNodes.stream().filter`?

##
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##
@@ -365,4 +367,48 @@ public void testExponentialConnectionSetupTimeout() {
 connectionSetupTimeoutMs * connectionSetupTimeoutJitter);
 assertTrue(connectionStates.connectingNodes().contains(nodeId1));
 }
+
+@Test
+public void testTimedOutConnections() {
+// Initiate two connections
+connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+
+// Expect no timed out connections
+
assertTrue(connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).isEmpty());
+
+// Advance time by half of the connection setup timeout
+time.sleep(connectionSetupTimeoutMs / 2);
+
+// Initiate a third connections

Review comment:
   nit: connections => connection

##
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##
@@ -365,4 +367,48 @@ public void testExponentialConnectionSetupTimeout() {
 connectionSetupTimeoutMs * connectionSetupTimeoutJitter);
 assertTrue(connectionStates.connectingNodes().contains(nodeId1));
 }
+
+@Test
+public void testTimedOutConnections() {
+// Initiate two connections
+connectionStates.connecting(nodeId1, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+connectionStates.connecting(nodeId2, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+
+// Expect no timed out connections
+
assertTrue(connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds()).isEmpty());
+
+// Advance time by half of the connection setup timeout
+time.sleep(connectionSetupTimeoutMs / 2);
+
+// Initiate a third connections
+connectionStates.connecting(nodeId3, time.milliseconds(), "localhost", 
ClientDnsLookup.DEFAULT);
+
+// Advance time beyond the connection setup timeout (+ max jitter) for 
the first two connections
+time.sleep((long) (connectionSetupTimeoutMs / 2 + 
connectionSetupTimeoutMs * connectionSetupTimeoutJitter));
+
+// Expect two timed out connections.
+Set timedOutConnections = 
connectionStates.nodesWithConnectionSetupTimeout(time.milliseconds());
+assertEquals(2, timedOutConnections.size());
+assertTrue(timedOutConnections.contains(nodeId1));
+assertTrue(timedOutConnections.contains(nodeId2));
+
+// Disconnect the first two connections
+connectionStates.disconnected(nodeId1, time.milliseconds());
+connectionStates.disconnected(nodeId2, time.milliseconds());
+
+// Advance time beyond the connection setup timeout (+ max jitter) for 
for the third connections

Review comment:
   typo: `for for`

##
File path: 
clients/src/test/java/org/apache/kafka/clients/ClusterConnectionStatesTest.java
##
@@ -365,4 +367,48 @@ public void testExponentialConnectionSetupTimeout() {
 connectionSetupTimeoutMs * connectionSetupTimeoutJitter);
 assertTrue(connectionStates.connectingNodes().cont

[GitHub] [kafka] dajac commented on a change in pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

2020-07-07 Thread GitBox


dajac commented on a change in pull request #8990:
URL: https://github.com/apache/kafka/pull/8990#discussion_r450756672



##
File path: 
clients/src/main/java/org/apache/kafka/clients/ClusterConnectionStates.java
##
@@ -440,6 +441,20 @@ public boolean isConnectionSetupTimeout(String id, long 
now) {
 return now - lastConnectAttemptMs(id) > connectionSetupTimeoutMs(id);
 }
 
+/**
+ * Return the Set of nodes whose connection setup has timed out.
+ * @param now the current time in ms
+ */
+public Set nodesWithConnectionSetupTimeout(long now) {
+Set nodes = new HashSet<>();
+for (String nodeId : connectingNodes) {

Review comment:
   Sure thing.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

2020-07-07 Thread GitBox


dajac commented on pull request #8990:
URL: https://github.com/apache/kafka/pull/8990#issuecomment-654752598


   @rajinisivaram Thanks for the review. I just pushed an update that addresses 
your feedback.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

2020-07-07 Thread GitBox


rajinisivaram commented on pull request #8990:
URL: https://github.com/apache/kafka/pull/8990#issuecomment-654755355


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mimaison commented on pull request #8984: KAFKA-10227: Enforce cleanup policy to only contain compact or delete once

2020-07-07 Thread GitBox


mimaison commented on pull request #8984:
URL: https://github.com/apache/kafka/pull/8984#issuecomment-654782138


   @huxihx As @dajac mentioned, I also envisioned doing this check on the 
broker side.
   
   I actually initially hit this issue with a 3rd party client and just 
included the kafka-topics command in the ticket to easily reproduce it.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9168) Integrate JNI direct buffer support to RocksDBStore

2020-07-07 Thread Adam Bellemare (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9168?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152701#comment-17152701
 ] 

Adam Bellemare commented on KAFKA-9168:
---

This would be incredibly useful for improving the performance of KTable-KTable 
foreign key joins.

> Integrate JNI direct buffer support to RocksDBStore
> ---
>
> Key: KAFKA-9168
> URL: https://issues.apache.org/jira/browse/KAFKA-9168
> Project: Kafka
>  Issue Type: Task
>  Components: streams
>Reporter: Sagar Rao
>Priority: Blocker
>  Labels: perfomance
> Fix For: 3.0.0
>
>
> There has been a PR created on rocksdb Java client to support direct 
> ByteBuffers in Java. We can look at integrating it whenever it gets merged. 
> Link to PR: [https://github.com/facebook/rocksdb/pull/2283]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] ijuma commented on a change in pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion

2020-07-07 Thread GitBox


ijuma commented on a change in pull request #8936:
URL: https://github.com/apache/kafka/pull/8936#discussion_r450843637



##
File path: core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
##
@@ -148,5 +148,39 @@ class TimeIndexTest {
 idx.close()
   }
 
-}
 
+  /**
+   * In certain cases, index files fail to have their pre-allocated 0 bytes 
trimmed from the tail
+   * when a new segment is rolled. This causes a silent failure at the next 
startup where all retention
+   * windows are breached purging out data whether or not the window was 
really breached.
+   * KAFKA-10207
+   */
+  @Test
+  def testLoadingUntrimmedIndex(): Unit = {
+// A larger index size must be specified or the starting offset will round 
down
+// preventing this issue from being reproduced. Configs default to 10mb.
+val max1MbEntryCount = 10
+// Create a file that will exist on disk and be removed when we are done
+val file = nonExistantTempFile()
+file.deleteOnExit()
+// create an index that can have up to 10 entries, about 1mb
+var idx2 = new TimeIndex(file, baseOffset = 0, max1MbEntryCount * 12)
+// Append less than the maximum number of entries, leaving 0 bytes padding 
the end
+for (i <- 1 until max1MbEntryCount)
+  idx2.maybeAppend(i, i)
+
+idx2.flush()
+// jvm 1.8.0_191 fails to always flush shrinking resize to zfs disk

Review comment:
   Apache Kafka contributors were involved in the discussion for reverting 
that change, but we were only aware of the performance impact. It sounds like 
you're saying that it also resulted in incorrect behavior?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on a change in pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion

2020-07-07 Thread GitBox


ijuma commented on a change in pull request #8936:
URL: https://github.com/apache/kafka/pull/8936#discussion_r450844571



##
File path: core/src/main/scala/kafka/log/LogSegment.scala
##
@@ -87,6 +87,12 @@ class LogSegment private[log] (val log: FileRecords,
   // we will recover the segments above the recovery point in recoverLog()
   // in any case so sanity checking them here is redundant.
   txnIndex.sanityCheck()
+  // Failing to sanity check the timeIndex can result in a scenario where 
log segments are
+  // prematurely deleted (before breaching retention periods) if the index 
file was not resized
+  // to disk successfully.
+  // KAFKA-10207
+  timeIndex.sanityCheck()
+  offsetIndex.sanityCheck()

Review comment:
   As you say, these were originally removed for performance reasons. We'd 
want to verify the performance impact of adding them back. cc @junrao @efeg 





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8979: KAFKA-10223; Make ReplicaNotAvailableException retriable metadata exception

2020-07-07 Thread GitBox


ijuma commented on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-654842200


   Renaming the error code name has no compatibility implications so in theory 
it could be done without a KIP. We could make this really boring and safe and 
call it `NOT_LEADER_OR_FOLLOWER`. I think this would not cause any confusion 
since the name is so similar and the length is similar since I trimmed the 
`FOR_PARTITION`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8979: KAFKA-10223; Make ReplicaNotAvailableException retriable metadata exception

2020-07-07 Thread GitBox


ijuma commented on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-654843181


   If you both agree that the name above is reasonable, I would suggest sending 
a follow up to the KIP-320 thread covering what we've learned and how we're 
addressing it. It would be good to update the KIP text too so that we have 
something to refer to.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8979: KAFKA-10223; Make ReplicaNotAvailableException retriable metadata exception

2020-07-07 Thread GitBox


ijuma commented on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-654844522


   One issue is that the exception `NotLeaderForPartitionException` is in a 
public package. We could introduce a subclass, alwayys throw the subclass and 
deprecate `NotLeaderForPartitionException`.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr removed a comment on pull request #7204: KAFKA-8794: Provide Javadoc on DescribeLogDirsResult

2020-07-07 Thread GitBox


dongjinleekr removed a comment on pull request #7204:
URL: https://github.com/apache/kafka/pull/7204#issuecomment-653769037


   Retest this please.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram commented on pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

2020-07-07 Thread GitBox


rajinisivaram commented on pull request #8990:
URL: https://github.com/apache/kafka/pull/8990#issuecomment-654860922


   retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #7204: KAFKA-8794: Deprecate DescribeLogDirsResponse.[LogDirInfo, ReplicaInfo]

2020-07-07 Thread GitBox


dongjinleekr commented on pull request #7204:
URL: https://github.com/apache/kafka/pull/7204#issuecomment-654863820


   @mimaison Hi. I updated the PR and submitted the 
[KIP](https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866169).
 I know that you must be on working frenzy now, but if you have some spare time 
please have a look and give some feedback.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr commented on pull request #7204: KAFKA-8794: Deprecate DescribeLogDirsResponse.[LogDirInfo, ReplicaInfo]

2020-07-07 Thread GitBox


dongjinleekr commented on pull request #7204:
URL: https://github.com/apache/kafka/pull/7204#issuecomment-654864266


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dongjinleekr removed a comment on pull request #7204: KAFKA-8794: Deprecate DescribeLogDirsResponse.[LogDirInfo, ReplicaInfo]

2020-07-07 Thread GitBox


dongjinleekr removed a comment on pull request #7204:
URL: https://github.com/apache/kafka/pull/7204#issuecomment-654864266


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Assigned] (KAFKA-10241) Pursue a better way to cover ignorable RPC fields

2020-07-07 Thread Tom Bentley (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10241?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Tom Bentley reassigned KAFKA-10241:
---

Assignee: Tom Bentley

> Pursue a better way to cover ignorable RPC fields 
> --
>
> Key: KAFKA-10241
> URL: https://issues.apache.org/jira/browse/KAFKA-10241
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, core
>Reporter: Boyang Chen
>Assignee: Tom Bentley
>Priority: Major
>
> We have hit case such as https://issues.apache.org/jira/browse/KAFKA-10239 
> where we accidentally include a non-ignorable field into the returned 
> response, and eventually crash older clients who doesn't support this field. 
> It would be good to add a generic test suite to cover all existing and new 
> RPC changes to ensure that we don't have a chance to put a non-ignorable 
> field for older version of clients.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-6453) Reconsider timestamp propagation semantics

2020-07-07 Thread Victoria Bialas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victoria Bialas reassigned KAFKA-6453:
--

Assignee: (was: Victoria Bialas)

> Reconsider timestamp propagation semantics
> --
>
> Key: KAFKA-6453
> URL: https://issues.apache.org/jira/browse/KAFKA-6453
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> Atm, Kafka Streams only has a defined "contract" about timestamp propagation 
> at the Processor API level: all processor within a sub-topology, see the 
> timestamp from the input topic record and this timestamp will be used for all 
> result record when writing them to an topic, too.
> The DSL, inherits this "contract" atm.
> From a DSL point of view, it would be desirable to provide a different 
> contract to the user. To allow this, we need to do the following:
>  - extend Processor API to allow manipulation timestamps (ie, a Processor can 
> set a new timestamp for downstream records)
>  - define a DSL "contract" for timestamp propagation for each DSL operator
>  - document the DSL "contract"
>  - implement the DSL "contract" using the new/extended Processor API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on pull request #7204: KAFKA-8794: Deprecate DescribeLogDirsResponse.[LogDirInfo, ReplicaInfo]

2020-07-07 Thread GitBox


mimaison commented on pull request #7204:
URL: https://github.com/apache/kafka/pull/7204#issuecomment-654883342


   In the meantime, someone else had submitted a KIP for this change: 
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158862109. I 
see both KIPs have active threads on the mailing list at the moment.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-6453) Reconsider timestamp propagation semantics

2020-07-07 Thread Victoria Bialas (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152765#comment-17152765
 ] 

Victoria Bialas commented on KAFKA-6453:


Hi [~mjsax], I tried to re-assign this to James Galasyn who is covering Streams 
Docs now, but he doesn't come up as an option to reassign. I'll forward the 
link to this to him on Slack, and you guys can figure out how to reassign. 
(Otherwise this will be perennially relegated to the back burner.)

> Reconsider timestamp propagation semantics
> --
>
> Key: KAFKA-6453
> URL: https://issues.apache.org/jira/browse/KAFKA-6453
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Priority: Major
>  Labels: needs-kip
>
> Atm, Kafka Streams only has a defined "contract" about timestamp propagation 
> at the Processor API level: all processor within a sub-topology, see the 
> timestamp from the input topic record and this timestamp will be used for all 
> result record when writing them to an topic, too.
> The DSL, inherits this "contract" atm.
> From a DSL point of view, it would be desirable to provide a different 
> contract to the user. To allow this, we need to do the following:
>  - extend Processor API to allow manipulation timestamps (ie, a Processor can 
> set a new timestamp for downstream records)
>  - define a DSL "contract" for timestamp propagation for each DSL operator
>  - document the DSL "contract"
>  - implement the DSL "contract" using the new/extended Processor API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-6453) Reconsider timestamp propagation semantics

2020-07-07 Thread Victoria Bialas (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6453?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Victoria Bialas reassigned KAFKA-6453:
--

Assignee: Victoria Bialas

> Reconsider timestamp propagation semantics
> --
>
> Key: KAFKA-6453
> URL: https://issues.apache.org/jira/browse/KAFKA-6453
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Matthias J. Sax
>Assignee: Victoria Bialas
>Priority: Major
>  Labels: needs-kip
>
> Atm, Kafka Streams only has a defined "contract" about timestamp propagation 
> at the Processor API level: all processor within a sub-topology, see the 
> timestamp from the input topic record and this timestamp will be used for all 
> result record when writing them to an topic, too.
> The DSL, inherits this "contract" atm.
> From a DSL point of view, it would be desirable to provide a different 
> contract to the user. To allow this, we need to do the following:
>  - extend Processor API to allow manipulation timestamps (ie, a Processor can 
> set a new timestamp for downstream records)
>  - define a DSL "contract" for timestamp propagation for each DSL operator
>  - document the DSL "contract"
>  - implement the DSL "contract" using the new/extended Processor API



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on pull request #8979: KAFKA-10223; Make ReplicaNotAvailableException retriable metadata exception

2020-07-07 Thread GitBox


rajinisivaram commented on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-654897593


   @ijuma Sounds good to me. Will see what @hachikuji thinks as well.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8979: KAFKA-10223; Make ReplicaNotAvailableException retriable metadata exception

2020-07-07 Thread GitBox


ijuma commented on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-654900194


   Also cc @edenhill as this affects non Java clients too.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8962: KAFKA-10166: checkpoint recycled standbys and ignore empty rocksdb base directory

2020-07-07 Thread GitBox


vvcephei commented on a change in pull request #8962:
URL: https://github.com/apache/kafka/pull/8962#discussion_r450513288



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -270,8 +270,11 @@ public void handleAssignment(final Map> activeTasks,
 if (oldTask.isActive()) {
 final Set partitions = 
standbyTasksToCreate.remove(oldTask.id());
 newTask = 
standbyTaskCreator.createStandbyTaskFromActive((StreamTask) oldTask, 
partitions);
+cleanUpTaskProducer(oldTask, taskCloseExceptions);
 } else {
 oldTask.suspend(); // Only need to suspend transitioning 
standbys, actives should be suspended already

Review comment:
   Thanks, all!
   
   Thanks for adding the check, @ableegoldman . To answer your question, I do 
think there may be some kind of misunderstanding. Almost certainly, I failed to 
make myself clear.
   
   I agree that the reason we enforce the valid transitions inside the task is 
so that the TM doesn't have to check every little thing. For example, that we 
call suspend in one place and close in another is not a big deal. As you 
pointed out, if we failed to call suspend, then the close call would let us 
know.
   
   However, in this case, we are checking something. But what we're checking is 
only vaguely related to what we do next. In other words, `if (! isSuspended() ) 
suspend()` makes way more sense than `if ( isStandby() ) suspend()`. How are we 
supposed to know that `standby == not suspended` at line 275 of a 700-line 
class? At least, with the log you added, if any quadrant of the implication is 
false, we'll have an error log telling us where it went wrong.
   
   Note, I think I was especially uneasy because we're now also committing the 
task in this block, and "committed" or "not committed" isn't a checked state, 
which is how we wound up with the subtle bug that you're fixing here. I think 
your fix is fine; I just felt the pre-existing structure of the code needed 
improvement.
   
   And thanks for the alternative proposals, @guozhangwang . I agree that 
either one of them would resolve my concern, and that we can take care of it in 
a follow-on PR.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx commented on pull request #7711: KAFKA-9157: Avoid generating empty segments if all records are deleted after cleaning

2020-07-07 Thread GitBox


huxihx commented on pull request #7711:
URL: https://github.com/apache/kafka/pull/7711#issuecomment-654901088


   @junrao Sorry for the long delay. Please review this patch again.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8979: KAFKA-10223; Make ReplicaNotAvailableException retriable metadata exception

2020-07-07 Thread GitBox


ijuma commented on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-654904651


   We may also want to deprecate the error and exception for 
ReplicaNotAvailable since it would not serve any purpose, I think.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma edited a comment on pull request #8979: KAFKA-10223; Make ReplicaNotAvailableException retriable metadata exception

2020-07-07 Thread GitBox


ijuma edited a comment on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-654904651


   We may also want to deprecate the error and exception for 
ReplicaNotAvailable since it would not serve any purpose (apart from clients 
retaining compatibility with some brokers), I think.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] edenhill commented on pull request #8979: KAFKA-10223; Make ReplicaNotAvailableException retriable metadata exception

2020-07-07 Thread GitBox


edenhill commented on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-654911887


   Renaming to NOT_LEADER_OR_FOLLOWER and reusing that error code in place of 
REPLICA_NOT_AVAILABLE sounds like a good idea. :+1:



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on pull request #8985: MINOR; KafkaAdminClient#alterReplicaLogDirs should not fail all the futures when only one call fails

2020-07-07 Thread GitBox


cmccabe commented on pull request #8985:
URL: https://github.com/apache/kafka/pull/8985#issuecomment-654912275


   good find, @dajac!  LGTM pending Jenkins



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] huxihx commented on pull request #8984: KAFKA-10227: Enforce cleanup policy to only contain compact or delete once

2020-07-07 Thread GitBox


huxihx commented on pull request #8984:
URL: https://github.com/apache/kafka/pull/8984#issuecomment-654913649


   @mimaison @dajac If that's the case, I agree to move the logic into broker 
side. However, I suggest we keep the current refinement in `TopicCommand` and 
`ConfigCommand` since these tools should be capable of doing some 
pre-processing. What do you think?



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10243) ConcurrentModificationException while processing connection setup timeouts

2020-07-07 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152797#comment-17152797
 ] 

John Roesler commented on KAFKA-10243:
--

Hey all, it seems like this is fixing a regression in 2.5.1/2.6.0 . Should it 
be a blocker and tagged for those releases?

> ConcurrentModificationException while processing connection setup timeouts
> --
>
> Key: KAFKA-10243
> URL: https://issues.apache.org/jira/browse/KAFKA-10243
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Rajini Sivaram
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.7
>
>
> From [~guozhang] in [https://github.com/apache/kafka/pull/8683:]
> {quote}
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
>   at java.util.HashMap$KeyIterator.next(HashMap.java:1469)
>   at 
> org.apache.kafka.clients.NetworkClient.handleTimedOutConnections(NetworkClient.java:822)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:574)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
> {quote}
> While processing connection set up timeouts, we are iterating through the 
> connecting nodes to process timeouts and we disconnect within the loop, 
> removing the entry from the set in the loop that it iterating over the set:
> {code}
> for (String nodeId : connectingNodes) {
> if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
> this.selector.close(nodeId);
> log.debug(
> "Disconnecting from node {} due to socket connection 
> setup timeout. " +
> "The timeout value is {} ms.",
> nodeId,
> connectionStates.connectionSetupTimeoutMs(nodeId));
> processDisconnection(responses, nodeId, now, 
> ChannelState.LOCAL_CLOSE);
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] cmccabe commented on a change in pull request #8863: KAFKA-8528: Expose Trogdor-specific JMX metrics for Tasks and Agents

2020-07-07 Thread GitBox


cmccabe commented on a change in pull request #8863:
URL: https://github.com/apache/kafka/pull/8863#discussion_r450921851



##
File path: tools/src/main/java/org/apache/kafka/trogdor/agent/Agent.java
##
@@ -92,6 +93,8 @@
 
 private final Time time;
 
+final TrogdorMetrics trogdorMetrics;

Review comment:
   There are a few ways you could do this.  One is to have separate classes 
for agent metrics and coordinator metrics.  Another is to have a boolean or 
something that you pass in to determine whether agent or coordinator metrics 
are created.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cmccabe commented on a change in pull request #8863: KAFKA-8528: Expose Trogdor-specific JMX metrics for Tasks and Agents

2020-07-07 Thread GitBox


cmccabe commented on a change in pull request #8863:
URL: https://github.com/apache/kafka/pull/8863#discussion_r450505811



##
File path: 
tools/src/main/java/org/apache/kafka/trogdor/coordinator/Coordinator.java
##
@@ -70,6 +70,8 @@
 
 private final Time time;
 
+final TrogdorMetrics trogdorMetrics;

Review comment:
   should be final and just "metrics"





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-07 Thread GitBox


vvcephei commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-654916732


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10243) ConcurrentModificationException while processing connection setup timeouts

2020-07-07 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152805#comment-17152805
 ] 

David Jacot commented on KAFKA-10243:
-

[~vvcephei] This only affects trunk AFAIK.

> ConcurrentModificationException while processing connection setup timeouts
> --
>
> Key: KAFKA-10243
> URL: https://issues.apache.org/jira/browse/KAFKA-10243
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Rajini Sivaram
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.7
>
>
> From [~guozhang] in [https://github.com/apache/kafka/pull/8683:]
> {quote}
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
>   at java.util.HashMap$KeyIterator.next(HashMap.java:1469)
>   at 
> org.apache.kafka.clients.NetworkClient.handleTimedOutConnections(NetworkClient.java:822)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:574)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
> {quote}
> While processing connection set up timeouts, we are iterating through the 
> connecting nodes to process timeouts and we disconnect within the loop, 
> removing the entry from the set in the loop that it iterating over the set:
> {code}
> for (String nodeId : connectingNodes) {
> if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
> this.selector.close(nodeId);
> log.debug(
> "Disconnecting from node {} due to socket connection 
> setup timeout. " +
> "The timeout value is {} ms.",
> nodeId,
> connectionStates.connectionSetupTimeoutMs(nodeId));
> processDisconnection(responses, nodeId, now, 
> ChannelState.LOCAL_CLOSE);
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] akatona84 commented on a change in pull request #8019: KAFKA-8164: Improve test passing rate by rerunning flaky tests

2020-07-07 Thread GitBox


akatona84 commented on a change in pull request #8019:
URL: https://github.com/apache/kafka/pull/8019#discussion_r450931353



##
File path: README.md
##
@@ -48,6 +48,13 @@ Change the log4j setting in either 
`clients/src/test/resources/log4j.properties`
 
 ./gradlew clients:test --tests RequestResponseTest
 
+### Specifying test retries ###
+By default, each failed test is retried once up to a maximum of five retries 
per test run. Tests are retried at the end of the test task. Adjust these 
parameters in the following way:

Review comment:
   Maybe we should update this line that by default it is not retrying.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-10244) An new java interface to replace 'kafka.common.MessageReader'

2020-07-07 Thread Chia-Ping Tsai (Jira)
Chia-Ping Tsai created KAFKA-10244:
--

 Summary: An new java interface to replace 
'kafka.common.MessageReader'
 Key: KAFKA-10244
 URL: https://issues.apache.org/jira/browse/KAFKA-10244
 Project: Kafka
  Issue Type: Improvement
Reporter: Chia-Ping Tsai
Assignee: Chia-Ping Tsai


inspired by 
https://github.com/apache/kafka/commit/caa806cd82fb9fa88510c81de53e69ac9846311d.

kafka.common.MessageReader is a pure scala trait and we should offer a java 
replacement to users.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #8987: KAFKA-10221: Backport fix for KAFKA-9603 to 2.5

2020-07-07 Thread GitBox


vvcephei commented on pull request #8987:
URL: https://github.com/apache/kafka/pull/8987#issuecomment-654946672


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8934: KAFKA-10134: Use long poll if we do not have fetchable partitions

2020-07-07 Thread GitBox


hachikuji commented on a change in pull request #8934:
URL: https://github.com/apache/kafka/pull/8934#discussion_r450960971



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -1218,17 +1218,20 @@ public void assign(Collection 
partitions) {
 throw new IllegalStateException("Consumer is not subscribed to 
any topics or assigned any partitions");
 }
 
-// poll for new data until the timeout expires
 do {
 client.maybeTriggerWakeup();
 
 if (includeMetadataInTimeout) {
-// try to update assignment metadata BUT do not need to 
block on the timer,
-// since even if we are 1) in the middle of a rebalance or 
2) have partitions
-// with unknown starting positions we may still want to 
return some data
-// as long as there are some partitions fetchable; NOTE we 
always use a timer with 0ms
-// to never block on completing the rebalance procedure if 
there's any
-updateAssignmentMetadataIfNeeded(time.timer(0L));
+// try to update assignment metadata BUT do not need to 
block on the timer if we still have

Review comment:
   Hmm, not sure if I buy that. `trySend` would only return a value less 
than 5s if there was a pending request which needed to be send. However, as 
soon as it got sent, it would go back to 5s. So I'm not sure if it alone can 
account for a tight poll loop.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #8987: KAFKA-10221: Backport fix for KAFKA-9603 to 2.5

2020-07-07 Thread GitBox


vvcephei commented on a change in pull request #8987:
URL: https://github.com/apache/kafka/pull/8987#discussion_r450961919



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##
@@ -251,7 +252,7 @@ void restoreAllInternal(final Collection> records) {
 // This handles the case that state store is moved to a new 
client and does not
 // have the local RocksDB instance for the segment. In this 
case, toggleDBForBulkLoading
 // will only close the database and open it again with bulk 
loading enabled.
-if (!bulkLoadSegments.contains(segment)) {
+if (!bulkLoadSegments.contains(segment) && context instanceof 
ProcessorContextImpl) {

Review comment:
   Woah, this is subtle. IIUC, the fix works by asserting that we should 
only enable bulk loading if the provided context is a ProcessorContextImpl, 
which is the kind of context that is only provided when adding the store to an 
active task.
   
   This seems correct to me, and although it's very subtle, it also seems ok as 
a patch for an older codebase that won't need to be maintained much. But maybe 
we can have a comment, or an internal method for the check, like 
   
   ```suggestion
   if (!bulkLoadSegments.contains(segment) && 
isStoreForActiveTask(context)) {
   ```
   
   so that it'll be more obvious what's going on here?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8987: KAFKA-10221: Backport fix for KAFKA-9603 to 2.5

2020-07-07 Thread GitBox


vvcephei commented on pull request #8987:
URL: https://github.com/apache/kafka/pull/8987#issuecomment-654951589


   Retest this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9630) Replace OffsetsForLeaderEpoch request/response with automated protocol

2020-07-07 Thread David Jacot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152831#comment-17152831
 ] 

David Jacot commented on KAFKA-9630:


[~mimaison] Are you working on this one? If not, do you mind if I take it over?

> Replace OffsetsForLeaderEpoch request/response with automated protocol
> --
>
> Key: KAFKA-9630
> URL: https://issues.apache.org/jira/browse/KAFKA-9630
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] rajinisivaram commented on pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

2020-07-07 Thread GitBox


rajinisivaram commented on pull request #8990:
URL: https://github.com/apache/kafka/pull/8990#issuecomment-654954129


   @dajac Thanks for the fix - wow, green build, haven't seen that in a while! 
Merging to trunk.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram merged pull request #8990: KAFKA-10243; ConcurrentModificationException while processing connection setup timeouts

2020-07-07 Thread GitBox


rajinisivaram merged pull request #8990:
URL: https://github.com/apache/kafka/pull/8990


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-10243) ConcurrentModificationException while processing connection setup timeouts

2020-07-07 Thread Rajini Sivaram (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10243?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rajini Sivaram resolved KAFKA-10243.

  Reviewer: Rajini Sivaram
Resolution: Fixed

> ConcurrentModificationException while processing connection setup timeouts
> --
>
> Key: KAFKA-10243
> URL: https://issues.apache.org/jira/browse/KAFKA-10243
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Rajini Sivaram
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.7
>
>
> From [~guozhang] in [https://github.com/apache/kafka/pull/8683:]
> {quote}
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
>   at java.util.HashMap$KeyIterator.next(HashMap.java:1469)
>   at 
> org.apache.kafka.clients.NetworkClient.handleTimedOutConnections(NetworkClient.java:822)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:574)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
> {quote}
> While processing connection set up timeouts, we are iterating through the 
> connecting nodes to process timeouts and we disconnect within the loop, 
> removing the entry from the set in the loop that it iterating over the set:
> {code}
> for (String nodeId : connectingNodes) {
> if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
> this.selector.close(nodeId);
> log.debug(
> "Disconnecting from node {} due to socket connection 
> setup timeout. " +
> "The timeout value is {} ms.",
> nodeId,
> connectionStates.connectionSetupTimeoutMs(nodeId));
> processDisconnection(responses, nodeId, now, 
> ChannelState.LOCAL_CLOSE);
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9630) Replace OffsetsForLeaderEpoch request/response with automated protocol

2020-07-07 Thread Mickael Maison (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152837#comment-17152837
 ] 

Mickael Maison commented on KAFKA-9630:
---

No I have not started yet, so go for it!

> Replace OffsetsForLeaderEpoch request/response with automated protocol
> --
>
> Key: KAFKA-9630
> URL: https://issues.apache.org/jira/browse/KAFKA-9630
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: Mickael Maison
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9630) Replace OffsetsForLeaderEpoch request/response with automated protocol

2020-07-07 Thread David Jacot (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9630?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Jacot reassigned KAFKA-9630:
--

Assignee: David Jacot  (was: Mickael Maison)

> Replace OffsetsForLeaderEpoch request/response with automated protocol
> --
>
> Key: KAFKA-9630
> URL: https://issues.apache.org/jira/browse/KAFKA-9630
> Project: Kafka
>  Issue Type: Sub-task
>Reporter: Mickael Maison
>Assignee: David Jacot
>Priority: Major
>




--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on pull request #8979: KAFKA-10223; Make ReplicaNotAvailableException retriable metadata exception

2020-07-07 Thread GitBox


hachikuji commented on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-654959200


   `NOT_LEADER_OR_FOLLOWER` sounds good to me. If we're looking to attach the 
change to a KIP, then KIP-392 might be a better option.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-07-07 Thread Dima R (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152848#comment-17152848
 ] 

Dima R commented on KAFKA-10017:


Hi [~ableegoldman],

I just found that 

shouldUpgradeFromEosAlphaToEosBeta always fail when create KStreams instances 
with more 1 threads  (StreamsConfig.NUM_STREAM_THREADS_CONFIG=2)

May it indicate actual problem with EOS Beta, worth to create new ticket?

 

> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Blocker
>  Labels: flaky-test, unit-test
> Fix For: 2.6.0
>
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> With injectError = true:
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hachikuji commented on a change in pull request #8934: KAFKA-10134: Use long poll if we do not have fetchable partitions

2020-07-07 Thread GitBox


hachikuji commented on a change in pull request #8934:
URL: https://github.com/apache/kafka/pull/8934#discussion_r450978217



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -1218,17 +1218,20 @@ public void assign(Collection 
partitions) {
 throw new IllegalStateException("Consumer is not subscribed to 
any topics or assigned any partitions");
 }
 
-// poll for new data until the timeout expires
 do {
 client.maybeTriggerWakeup();
 
 if (includeMetadataInTimeout) {
-// try to update assignment metadata BUT do not need to 
block on the timer,
-// since even if we are 1) in the middle of a rebalance or 
2) have partitions
-// with unknown starting positions we may still want to 
return some data
-// as long as there are some partitions fetchable; NOTE we 
always use a timer with 0ms
-// to never block on completing the rebalance procedure if 
there's any
-updateAssignmentMetadataIfNeeded(time.timer(0L));
+// try to update assignment metadata BUT do not need to 
block on the timer if we still have

Review comment:
   I did a simple experiment with one broker and one consumer. After 
joining the group, I did a kill -9 to stop the broker. I had the consumer log 
the poll timeout that was used in `pollForFetches`. What I saw is this:
   ```
   [2020-07-07 08:53:10,825] INFO [Consumer clientId=consumer-foo-1, 
groupId=foo] Polling for fetches with timeout 0 
(org.apache.kafka.clients.consumer.KafkaConsumer)
   [2020-07-07 08:53:10,825] INFO [Consumer clientId=consumer-foo-1, 
groupId=foo] Polling for fetches with timeout 0 
(org.apache.kafka.clients.consumer.KafkaConsumer)
   [2020-07-07 08:53:10,825] INFO [Consumer clientId=consumer-foo-1, 
groupId=foo] Polling for fetches with timeout 0 
(org.apache.kafka.clients.consumer.KafkaConsumer)
   [2020-07-07 08:53:10,825] INFO [Consumer clientId=consumer-foo-1, 
groupId=foo] Polling for fetches with timeout 0 
(org.apache.kafka.clients.consumer.KafkaConsumer)
   [2020-07-07 08:53:10,839] INFO [Consumer clientId=consumer-foo-1, 
groupId=foo] Polling for fetches with timeout 0 
(org.apache.kafka.clients.consumer.KafkaConsumer)
   [2020-07-07 08:53:10,839] INFO [Consumer clientId=consumer-foo-1, 
groupId=foo] Polling for fetches with timeout 0 
(org.apache.kafka.clients.consumer.KafkaConsumer)
   [2020-07-07 08:53:10,839] INFO [Consumer clientId=consumer-foo-1, 
groupId=foo] Polling for fetches with timeout 0 
(org.apache.kafka.clients.consumer.KafkaConsumer)
   [2020-07-07 08:53:10,839] INFO [Consumer clientId=consumer-foo-1, 
groupId=foo] Polling for fetches with timeout 0 
(org.apache.kafka.clients.consumer.KafkaConsumer)
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ijuma commented on pull request #8979: KAFKA-10223; Make ReplicaNotAvailableException retriable metadata exception

2020-07-07 Thread GitBox


ijuma commented on pull request #8979:
URL: https://github.com/apache/kafka/pull/8979#issuecomment-654965817


   KIP-392 sounds good.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-07-07 Thread Dima R (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152848#comment-17152848
 ] 

Dima R edited comment on KAFKA-10017 at 7/7/20, 4:09 PM:
-

Hi [~ableegoldman],

I just found that 

shouldUpgradeFromEosAlphaToEosBeta always fail when create KStream instances 
with more then 1 threads  (StreamsConfig.NUM_STREAM_THREADS_CONFIG=2)

May it indicate actual problem with EOS Beta, worth to create new ticket?

 


was (Author: dima5rr):
Hi [~ableegoldman],

I just found that 

shouldUpgradeFromEosAlphaToEosBeta always fail when create KStreams instances 
with more 1 threads  (StreamsConfig.NUM_STREAM_THREADS_CONFIG=2)

May it indicate actual problem with EOS Beta, worth to create new ticket?

 

> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Blocker
>  Labels: flaky-test, unit-test
> Fix For: 2.6.0
>
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> With injectError = true:
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on a change in pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-07 Thread GitBox


vvcephei commented on a change in pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#discussion_r450979875



##
File path: 
streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java
##
@@ -193,6 +194,11 @@ public InternalMockProcessorContext(final File stateDir,
 this.metrics().setRocksDBMetricsRecordingTrigger(new 
RocksDBMetricsRecordingTrigger(new SystemTime()));
 }
 
+@Override
+protected StateManagerStub stateManager() {

Review comment:
   Should this be:
   
   ```suggestion
   protected StateManager stateManager() {
   ```
   
   ?

##
File path: streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java
##
@@ -55,6 +58,11 @@ private static StreamsConfig streamsConfig() {
 return new StreamsConfig(props);
 }
 
+@Override
+protected StateManagerStub stateManager() {

Review comment:
   and here?
   ```suggestion
   protected StateManager stateManager() {
   ```





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10243) ConcurrentModificationException while processing connection setup timeouts

2020-07-07 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10243?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152856#comment-17152856
 ] 

John Roesler commented on KAFKA-10243:
--

Thanks, [~dajac] !

> ConcurrentModificationException while processing connection setup timeouts
> --
>
> Key: KAFKA-10243
> URL: https://issues.apache.org/jira/browse/KAFKA-10243
> Project: Kafka
>  Issue Type: Bug
>  Components: network
>Reporter: Rajini Sivaram
>Assignee: David Jacot
>Priority: Major
> Fix For: 2.7
>
>
> From [~guozhang] in [https://github.com/apache/kafka/pull/8683:]
> {quote}
> java.util.ConcurrentModificationException
>   at java.util.HashMap$HashIterator.nextNode(HashMap.java:1445)
>   at java.util.HashMap$KeyIterator.next(HashMap.java:1469)
>   at 
> org.apache.kafka.clients.NetworkClient.handleTimedOutConnections(NetworkClient.java:822)
>   at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:574)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
>   at 
> org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:419)
>   at 
> org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:359)
> {quote}
> While processing connection set up timeouts, we are iterating through the 
> connecting nodes to process timeouts and we disconnect within the loop, 
> removing the entry from the set in the loop that it iterating over the set:
> {code}
> for (String nodeId : connectingNodes) {
> if (connectionStates.isConnectionSetupTimeout(nodeId, now)) {
> this.selector.close(nodeId);
> log.debug(
> "Disconnecting from node {} due to socket connection 
> setup timeout. " +
> "The timeout value is {} ms.",
> nodeId,
> connectionStates.connectionSetupTimeoutMs(nodeId));
> processDisconnection(responses, nodeId, now, 
> ChannelState.LOCAL_CLOSE);
> }
> }
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest

2020-07-07 Thread GitBox


vvcephei commented on pull request #8913:
URL: https://github.com/apache/kafka/pull/8913#issuecomment-654971292


   Thanks, @chia7712 , your response makes sense.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hachikuji commented on a change in pull request #8934: KAFKA-10134: Use long poll if we do not have fetchable partitions

2020-07-07 Thread GitBox


hachikuji commented on a change in pull request #8934:
URL: https://github.com/apache/kafka/pull/8934#discussion_r450978217



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java
##
@@ -1218,17 +1218,20 @@ public void assign(Collection 
partitions) {
 throw new IllegalStateException("Consumer is not subscribed to 
any topics or assigned any partitions");
 }
 
-// poll for new data until the timeout expires
 do {
 client.maybeTriggerWakeup();
 
 if (includeMetadataInTimeout) {
-// try to update assignment metadata BUT do not need to 
block on the timer,
-// since even if we are 1) in the middle of a rebalance or 
2) have partitions
-// with unknown starting positions we may still want to 
return some data
-// as long as there are some partitions fetchable; NOTE we 
always use a timer with 0ms
-// to never block on completing the rebalance procedure if 
there's any
-updateAssignmentMetadataIfNeeded(time.timer(0L));
+// try to update assignment metadata BUT do not need to 
block on the timer if we still have

Review comment:
   I did a simple experiment with one broker and one consumer. After the 
consumer joined the group, I did a kill -9 to stop the broker. I had the 
consumer log the poll timeout that was used in `pollForFetches`. What I saw is 
this:
   ```
   [2020-07-07 08:53:10,825] INFO [Consumer clientId=consumer-foo-1, 
groupId=foo] Polling for fetches with timeout 0 
(org.apache.kafka.clients.consumer.KafkaConsumer)
   [2020-07-07 08:53:10,825] INFO [Consumer clientId=consumer-foo-1, 
groupId=foo] Polling for fetches with timeout 0 
(org.apache.kafka.clients.consumer.KafkaConsumer)
   [2020-07-07 08:53:10,825] INFO [Consumer clientId=consumer-foo-1, 
groupId=foo] Polling for fetches with timeout 0 
(org.apache.kafka.clients.consumer.KafkaConsumer)
   [2020-07-07 08:53:10,825] INFO [Consumer clientId=consumer-foo-1, 
groupId=foo] Polling for fetches with timeout 0 
(org.apache.kafka.clients.consumer.KafkaConsumer)
   [2020-07-07 08:53:10,839] INFO [Consumer clientId=consumer-foo-1, 
groupId=foo] Polling for fetches with timeout 0 
(org.apache.kafka.clients.consumer.KafkaConsumer)
   [2020-07-07 08:53:10,839] INFO [Consumer clientId=consumer-foo-1, 
groupId=foo] Polling for fetches with timeout 0 
(org.apache.kafka.clients.consumer.KafkaConsumer)
   [2020-07-07 08:53:10,839] INFO [Consumer clientId=consumer-foo-1, 
groupId=foo] Polling for fetches with timeout 0 
(org.apache.kafka.clients.consumer.KafkaConsumer)
   [2020-07-07 08:53:10,839] INFO [Consumer clientId=consumer-foo-1, 
groupId=foo] Polling for fetches with timeout 0 
(org.apache.kafka.clients.consumer.KafkaConsumer)
   ```
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest

2020-07-07 Thread GitBox


vvcephei commented on pull request #8913:
URL: https://github.com/apache/kafka/pull/8913#issuecomment-654972949


   I'm sure you did this locally, but I'm running the system tests one more 
time, just to be on the safe side before merging:
   https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4013/



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] cadonna commented on a change in pull request #8987: KAFKA-10221: Backport fix for KAFKA-9603 to 2.5

2020-07-07 Thread GitBox


cadonna commented on a change in pull request #8987:
URL: https://github.com/apache/kafka/pull/8987#discussion_r450989016



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractRocksDBSegmentedBytesStore.java
##
@@ -251,7 +252,7 @@ void restoreAllInternal(final Collection> records) {
 // This handles the case that state store is moved to a new 
client and does not
 // have the local RocksDB instance for the segment. In this 
case, toggleDBForBulkLoading
 // will only close the database and open it again with bulk 
loading enabled.
-if (!bulkLoadSegments.contains(segment)) {
+if (!bulkLoadSegments.contains(segment) && context instanceof 
ProcessorContextImpl) {

Review comment:
   Good point! Will do!





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-07 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152866#comment-17152866
 ] 

Di Campo edited comment on KAFKA-4273 at 7/7/20, 4:27 PM:
--

Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{
def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }
}}




was (Author: xmar):
Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }}}



> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as 

[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-07 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152866#comment-17152866
 ] 

Di Campo commented on KAFKA-4273:
-


Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{  def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }}}



> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-07 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152866#comment-17152866
 ] 

Di Campo edited comment on KAFKA-4273 at 7/7/20, 4:27 PM:
--

Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }}}




was (Author: xmar):

Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{  def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }}}



> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as

[jira] [Comment Edited] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-07 Thread Di Campo (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152866#comment-17152866
 ] 

Di Campo edited comment on KAFKA-4273 at 7/7/20, 4:28 PM:
--

Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 


def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }




was (Author: xmar):
Thanks Matthias. 
In my case I want an emit-on-change behaviour, so keeping just the latest 
element works for me. 
I am trying a windowed-store. However WindowIterator doesn't have the best 
semantics for having one element only. 
I will try to overcome that by removing the element before updating. Is that 
performant, and safe enough? 
Code example: 

{{
def transform(key: K, value: V): V = {
if(value == null) {
  null.asInstanceOf[V]
} else {
  val eventTime = context.timestamp()
  var timeIterator : WindowStoreIterator[V] = null
  try {
var timeIterator = eventIdStore.fetch(
  key,
  eventTime - leftDurationMs,
  eventTime + rightDurationMs)
val exists = timeIterator.hasNext()
if (exists) {
  val old = timeIterator.next()
  if( old != null && value.equalsRelation(old.value)) {
return null.asInstanceOf[V]
  } else {
// update
timeIterator.remove()
insert(key, value)
value
  }
} else {
  insert(key, value)
  value
}
  } finally {
if(timeIterator != null) {
  timeIterator.close()
}
  }
}
  }
}}



> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "co

[jira] [Assigned] (KAFKA-10215) MockProcessorContext doesn't work with SessionStores

2020-07-07 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10215?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-10215:


Assignee: John Roesler

> MockProcessorContext doesn't work with SessionStores
> 
>
> Key: KAFKA-10215
> URL: https://issues.apache.org/jira/browse/KAFKA-10215
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> The recommended pattern for testing custom Processor implementations is to 
> use the test-utils MockProcessorContext. If a Processor implementation needs 
> a store, the store also has to be initialized with the same context. However, 
> the existing (in-memory and persistent) Session store implementations perform 
> internal casts that result in class cast exceptions if you attempt to 
> initialize them with the MockProcessorContext.
> A workaround is to instead embed the processor in an application and use the 
> TopologyTestDriver instead.
> The fix is the same as for KAFKA-10200



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10226) KStream without SASL information should return error in confluent cloud

2020-07-07 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152888#comment-17152888
 ] 

John Roesler commented on KAFKA-10226:
--

Hi [~wdaehn] ,

Thanks for the report, and sorry for the trouble.

As far as I know, we should log an error in this case. Can you confirm that you 
got no error or warning logs in addition to no exceptions?

Thanks,

-John

> KStream without SASL information should return error in confluent cloud
> ---
>
> Key: KAFKA-10226
> URL: https://issues.apache.org/jira/browse/KAFKA-10226
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, streams
>Affects Versions: 2.5.0
>Reporter: Werner Daehn
>Priority: Minor
>
> I have create a KStream against the Confluent cloud and wondered why no data 
> has been received from the source. Reason was that I forgot to add the SASL 
> api keys and secrets.
>  
> For end users this might lead to usability issues. If the KStream wants to 
> read from a topic and is not allowed to, this should raise an error, not be 
> silently ignored.
>  
> Hoe do producer/consumer clients handle that situation?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante commented on a change in pull request #8928: KAFKA-10192: Wait for REST API to become available before testing blocked connectors

2020-07-07 Thread GitBox


C0urante commented on a change in pull request #8928:
URL: https://github.com/apache/kafka/pull/8928#discussion_r451008207



##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
##
@@ -76,6 +78,15 @@ public void setup() {
 
 // start the clusters
 connect.start();
+
+// wait for the Connect REST API to become available. necessary 
because of the reduced REST
+// request timeout; otherwise, we may get an unexpected 500 with our 
first real REST request
+// if the worker is still getting on its feet.
+waitForCondition(

Review comment:
   That hits the in-memory config state and doesn't require a herder 
request, so the availability of that endpoint doesn't guarantee that the herder 
has finished startup unfortunatey.

##
File path: 
connect/runtime/src/test/java/org/apache/kafka/connect/integration/BlockingConnectorTest.java
##
@@ -76,6 +78,15 @@ public void setup() {
 
 // start the clusters
 connect.start();
+
+// wait for the Connect REST API to become available. necessary 
because of the reduced REST
+// request timeout; otherwise, we may get an unexpected 500 with our 
first real REST request
+// if the worker is still getting on its feet.
+waitForCondition(

Review comment:
   That hits the in-memory config state and doesn't require a herder 
request, so the availability of that endpoint doesn't guarantee that the herder 
has finished startup unfortunately.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dajac commented on pull request #8984: KAFKA-10227: Enforce cleanup policy to only contain compact or delete once

2020-07-07 Thread GitBox


dajac commented on pull request #8984:
URL: https://github.com/apache/kafka/pull/8984#issuecomment-654993589


   @huxihx I am not against improving the command line but I think that the 
approach that you are proposing in this PR is a bit risky for the future. 
Please, take a look at my previous comment and let me know what you think.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Johnny-Malizia commented on a change in pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion

2020-07-07 Thread GitBox


Johnny-Malizia commented on a change in pull request #8936:
URL: https://github.com/apache/kafka/pull/8936#discussion_r451015408



##
File path: core/src/main/scala/kafka/log/LogSegment.scala
##
@@ -87,6 +87,12 @@ class LogSegment private[log] (val log: FileRecords,
   // we will recover the segments above the recovery point in recoverLog()
   // in any case so sanity checking them here is redundant.
   txnIndex.sanityCheck()
+  // Failing to sanity check the timeIndex can result in a scenario where 
log segments are
+  // prematurely deleted (before breaching retention periods) if the index 
file was not resized
+  // to disk successfully.
+  // KAFKA-10207
+  timeIndex.sanityCheck()
+  offsetIndex.sanityCheck()

Review comment:
   If the concern here is startup performance, we could instead fallback to 
using the mtime of the file when the last offset and timestamp are both 0.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10017) Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta

2020-07-07 Thread Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10017?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152909#comment-17152909
 ] 

Sophie Blee-Goldman commented on KAFKA-10017:
-

Hey [~dima5rr]

This test assumes only one thread per instance in order to hook into Streams 
and and force a commit or a crash, for example. I would not expect it to pass 
if you change the number of threads.

It could be extended to work with/test a multithreaded upgrade if you'd be 
interested in submitting a PR, though :) 

> Flaky Test EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta
> ---
>
> Key: KAFKA-10017
> URL: https://issues.apache.org/jira/browse/KAFKA-10017
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Sophie Blee-Goldman
>Assignee: Sophie Blee-Goldman
>Priority: Blocker
>  Labels: flaky-test, unit-test
> Fix For: 2.6.0
>
>
> Creating a new ticket for this since the root cause is different than 
> https://issues.apache.org/jira/browse/KAFKA-9966
> With injectError = true:
> h3. Stacktrace
> java.lang.AssertionError: Did not receive all 20 records from topic 
> multiPartitionOutputTopic within 6 ms Expected: is a value equal to or 
> greater than <20> but: <15> was less than <20> at 
> org.hamcrest.MatcherAssert.assertThat(MatcherAssert.java:20) at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.lambda$waitUntilMinKeyValueRecordsReceived$1(IntegrationTestUtils.java:563)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:429)
>  at 
> org.apache.kafka.test.TestUtils.retryOnExceptionWithTimeout(TestUtils.java:397)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:559)
>  at 
> org.apache.kafka.streams.integration.utils.IntegrationTestUtils.waitUntilMinKeyValueRecordsReceived(IntegrationTestUtils.java:530)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.readResult(EosBetaUpgradeIntegrationTest.java:973)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.verifyCommitted(EosBetaUpgradeIntegrationTest.java:961)
>  at 
> org.apache.kafka.streams.integration.EosBetaUpgradeIntegrationTest.shouldUpgradeFromEosAlphaToEosBeta(EosBetaUpgradeIntegrationTest.java:427)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] guozhangwang commented on a change in pull request #8964: KAFKA-9450: Decouple flushing state from commiting

2020-07-07 Thread GitBox


guozhangwang commented on a change in pull request #8964:
URL: https://github.com/apache/kafka/pull/8964#discussion_r451021225



##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/TaskManager.java
##
@@ -243,18 +242,24 @@ public void handleAssignment(final Map> activeTasks,
 
 for (final Task task : tasksToClose) {
 try {
-if (task.isActive()) {

Review comment:
   As we discussed in the other PR, I'm removing the logic for TM to check 
on task state and replaced it with checking that there should be nothing to 
commit (since suspend / pre-commit / post-commit are not all idempotent).





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] abbccdda commented on a change in pull request #8986: KAFKA-10233; Add backoff after AuthorizationExceptions in consumer

2020-07-07 Thread GitBox


abbccdda commented on a change in pull request #8986:
URL: https://github.com/apache/kafka/pull/8986#discussion_r451022144



##
File path: 
clients/src/main/java/org/apache/kafka/clients/consumer/internals/AbstractCoordinator.java
##
@@ -460,15 +463,21 @@ boolean joinGroupIfNeeded(final Timer timer) {
 exception instanceof IllegalGenerationException ||
 exception instanceof MemberIdRequiredException)
 continue;
-else if (!future.isRetriable())
-throw exception;
-
-timer.sleep(rebalanceConfig.retryBackoffMs);
+else {
+handleFailure(future, timer);
+}
 }
 }
 return true;
 }
 
+protected void handleFailure(RequestFuture future, Timer timer) {
+if (future.isRetriable() || future.exception() instanceof 
AuthorizationException)

Review comment:
   GroupAuthorizationException is the only retriable exception defined in 
the ticket, maybe we should just scope the fix only to that. Agree with No.2 to 
backoff for all retriable exceptions.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10229) Kafka stream dies for no apparent reason, no errors logged on client or server

2020-07-07 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10229?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152912#comment-17152912
 ] 

Guozhang Wang commented on KAFKA-10229:
---

Thanks, please feel free to update this ticket if you have new findings.

> Kafka stream dies for no apparent reason, no errors logged on client or server
> --
>
> Key: KAFKA-10229
> URL: https://issues.apache.org/jira/browse/KAFKA-10229
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.1
>Reporter: Raman Gupta
>Priority: Major
>
> My broker and clients are 2.4.1. I'm currently running a single broker. I 
> have a Kafka stream with exactly once processing turned on. I also have an 
> uncaught exception handler defined on the client. I have a stream which I 
> noticed was lagging. Upon investigation, I see that the consumer group was 
> empty.
> On restarting the consumers, the consumer group re-established itself, but 
> after about 8 minutes, the group became empty again. There is nothing logged 
> on the client side about any stream errors, despite the existence of an 
> uncaught exception handler.
> In the broker logs, I see that about 8 minutes after the clients restart / 
> the stream goes to RUNNING state:
> ```
> [2020-07-02 17:34:47,033] INFO [GroupCoordinator 0]: Member 
> cis-d7fb64c95-kl9wl-1-630af77f-138e-49d1-b76a-6034801ee359 in group 
> produs-cisFileIndexer-stream has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
> [2020-07-02 17:34:47,033] INFO [GroupCoordinator 0]: Preparing to rebalance 
> group produs-cisFileIndexer-stream in state PreparingRebalance with old 
> generation 228 (__consumer_offsets-3) (reason: removing member 
> cis-d7fb64c95-kl9wl-1-630af77f-138e-49d1-b76a-6034801ee359 on heartbeat 
> expiration) (kafka.coordinator.group.GroupCoordinator)
> ```
> so according to this the consumer heartbeat has expired. I don't know why 
> this would be, logging shows that the stream was running and processing 
> messages normally and then just stopped processing anything about 4 minutes 
> before it dies, with no apparent errors or issues or anything logged via the 
> uncaught exception handler.
> It doesn't appear to be related to any specific poison pill type messages: 
> restarting the stream causes it to reprocess a bunch more messages from the 
> backlog, and then die again approximately 8 minutes later. At the time of the 
> last message consumed by the stream, there are no `INFO`-level or above logs 
> either in the client or the broker, or any errors whatsoever. The stream 
> consumption simply stops.
> There are two consumers -- even if I limit consumption to only a single 
> consumer, the same thing happens.
> The runtime environment is Kubernetes.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] dhruvilshah3 commented on a change in pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion

2020-07-07 Thread GitBox


dhruvilshah3 commented on a change in pull request #8936:
URL: https://github.com/apache/kafka/pull/8936#discussion_r451024024



##
File path: core/src/main/scala/kafka/log/LogSegment.scala
##
@@ -87,6 +87,12 @@ class LogSegment private[log] (val log: FileRecords,
   // we will recover the segments above the recovery point in recoverLog()
   // in any case so sanity checking them here is redundant.
   txnIndex.sanityCheck()
+  // Failing to sanity check the timeIndex can result in a scenario where 
log segments are
+  // prematurely deleted (before breaching retention periods) if the index 
file was not resized
+  // to disk successfully.
+  // KAFKA-10207
+  timeIndex.sanityCheck()
+  offsetIndex.sanityCheck()

Review comment:
   Did we consider performing the sanity check lazily, eg. when the index 
is loaded for the first time?





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] junrao commented on a change in pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion

2020-07-07 Thread GitBox


junrao commented on a change in pull request #8936:
URL: https://github.com/apache/kafka/pull/8936#discussion_r451024109



##
File path: core/src/main/scala/kafka/log/LogSegment.scala
##
@@ -87,6 +87,12 @@ class LogSegment private[log] (val log: FileRecords,
   // we will recover the segments above the recovery point in recoverLog()
   // in any case so sanity checking them here is redundant.
   txnIndex.sanityCheck()
+  // Failing to sanity check the timeIndex can result in a scenario where 
log segments are
+  // prematurely deleted (before breaching retention periods) if the index 
file was not resized
+  // to disk successfully.
+  // KAFKA-10207
+  timeIndex.sanityCheck()
+  offsetIndex.sanityCheck()

Review comment:
   @Johnny-Malizia : Thanks for reporting this. A couple of comments on 
this.
   
   (1) It seems that there are cases where the index file is not trimmed 
properly. Every rolled segment calls LogSegment.onBecomeInactiveSegment(), 
which does trimming. So, we probably should just fix the trim logic there.
   
   (2) The intention of KIP-263 is to avoid unnecessary loading (and the sanity 
check) of indexes never used. This is mostly achieved through lazy indexes 
(actual index file only opened lazily on first use). Since the biggest cost of 
loading an index file is probably the I/O part, and the sanity check is 
computational only and relatively cheap, we can probably just do the index 
sanity check on index opening. This way, in the common case, we only pay the 
index sanity checking cost on active segments.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10207) Untrimmed Index files cause premature log segment deletions on startup

2020-07-07 Thread Dhruvil Shah (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152915#comment-17152915
 ] 

Dhruvil Shah commented on KAFKA-10207:
--

Hmm, this seems pretty strange. Do you know if there was a specific jvm issue 
that was fixed to address the problem with `RandomAccessFile#setLength`?

> Untrimmed Index files cause premature log segment deletions on startup
> --
>
> Key: KAFKA-10207
> URL: https://issues.apache.org/jira/browse/KAFKA-10207
> Project: Kafka
>  Issue Type: Bug
>  Components: log
>Affects Versions: 2.4.0, 2.3.1, 2.4.1
>Reporter: Johnny Malizia
>Priority: Major
>
> [KIP-263|https://cwiki.apache.org/confluence/display/KAFKA/KIP-263%3A+Allow+broker+to+skip+sanity+check+of+inactive+segments+on+broker+startup#KIP263:Allowbrokertoskipsanitycheckofinactivesegmentsonbrokerstartup-Evaluation]
>  appears to have introduced a change explicitly deciding to not call the 
> sanityCheck method on the time or offset index files that are loaded by Kafka 
> at startup. I found a particularly nasty bug using the following configuration
> {code:java}
> jvm=1.8.0_191 zfs=0.6.5.6 kernel=4.4.0-1013-aws kafka=2.4.1{code}
> The bug was that the retention period for a topic or even the broker level 
> configuration seemed to not be respected, no matter what, when the broker 
> started up it would decide that all log segments on disk were breaching the 
> retention window and the data would be purged away.
>  
> {code:java}
> Found deletable segments with base offsets [11610665,12130396,12650133] due 
> to retention time 8640ms breach {code}
> {code:java}
> Rolled new log segment at offset 12764291 in 1 ms. (kafka.log.Log)
> Scheduling segments for deletion List(LogSegment(baseOffset=11610665, 
> size=1073731621, lastModifiedTime=1592532125000, largestTime=0), 
> LogSegment(baseOffset=12130396, size=1073727967, 
> lastModifiedTime=1592532462000, largestTime=0), 
> LogSegment(baseOffset=12650133, size=235891971, 
> lastModifiedTime=1592532531000, largestTime=0)) {code}
> Further logging showed that this issue was happening when loading the files, 
> indicating the final writes to trim the index were not successful
> {code:java}
> DEBUG Loaded index file 
> /mnt/kafka-logs/test_topic-0/17221277.timeindex with maxEntries = 
> 873813, maxIndexSize = 10485760, entries = 873813, lastOffset = 
> TimestampOffset(0,17221277), file position = 10485756 
> (kafka.log.TimeIndex){code}
>  It looks like the initially file is preallocated (10MB by default) and index 
> entries are added over time. When it's time to roll to a new log segment, the 
> index file is supposed to be trimmed, removing any 0 bytes left at the tail 
> from the initial allocation. But, in some cases that doesn't seem to happen 
> successfully. Because 0 bytes at the tail may not have been removed, when the 
> index is loaded again after restarting Kafka, the buffer seeks the position 
> to the end and the next timestamp is 0 and this leads to a premature TTL 
> deletion of the log segments.
>  
> I tracked the issue down to being caused by the jvm version being used as 
> upgrading resolved this issue, but I think that Kafka should never delete 
> data by mistake like this as doing a rolling restart with this bug in place 
> would cause complete data-loss across the cluster.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] junrao commented on a change in pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion

2020-07-07 Thread GitBox


junrao commented on a change in pull request #8936:
URL: https://github.com/apache/kafka/pull/8936#discussion_r451024109



##
File path: core/src/main/scala/kafka/log/LogSegment.scala
##
@@ -87,6 +87,12 @@ class LogSegment private[log] (val log: FileRecords,
   // we will recover the segments above the recovery point in recoverLog()
   // in any case so sanity checking them here is redundant.
   txnIndex.sanityCheck()
+  // Failing to sanity check the timeIndex can result in a scenario where 
log segments are
+  // prematurely deleted (before breaching retention periods) if the index 
file was not resized
+  // to disk successfully.
+  // KAFKA-10207
+  timeIndex.sanityCheck()
+  offsetIndex.sanityCheck()

Review comment:
   @Johnny-Malizia : Thanks for reporting this. A couple of comments on 
this.
   
   (1) It seems that there are cases where the index file is not trimmed 
properly. Every rolled segment calls LogSegment.onBecomeInactiveSegment(), 
which does trimming. So, we probably should just fix the trim logic there.
   
   (2) The sanity checking of index may have merit itself if it's cheap. The 
intention of KIP-263 is to avoid unnecessary loading (and the sanity check) of 
indexes never used. This is mostly achieved through lazy indexes (actual index 
file only opened lazily on first use). Since the biggest cost of loading an 
index file is probably the I/O part, and the sanity check is computational only 
and relatively cheap, we can probably just do the index sanity check on index 
opening. This way, in the common case, we only pay the index sanity checking 
cost on active segments.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-07 Thread GitBox


vvcephei commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-655012979


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-07 Thread GitBox


vvcephei commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-655013091


   Test this please



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-07 Thread Matthias J. Sax (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152928#comment-17152928
 ] 

Matthias J. Sax commented on KAFKA-4273:


I see. I guess it's safe. Not sure how performant it is. But it's a good point 
that we might want to add TTL support for plain kv-stores, too.

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10226) KStream without SASL information should return error in confluent cloud

2020-07-07 Thread Werner Daehn (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10226?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152932#comment-17152932
 ] 

Werner Daehn commented on KAFKA-10226:
--

As I am using the Kafka Client there are no logs on the client side. Server I 
haven't checked.

> KStream without SASL information should return error in confluent cloud
> ---
>
> Key: KAFKA-10226
> URL: https://issues.apache.org/jira/browse/KAFKA-10226
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, streams
>Affects Versions: 2.5.0
>Reporter: Werner Daehn
>Priority: Minor
>
> I have create a KStream against the Confluent cloud and wondered why no data 
> has been received from the source. Reason was that I forgot to add the SASL 
> api keys and secrets.
>  
> For end users this might lead to usability issues. If the KStream wants to 
> read from a topic and is not allowed to, this should raise an error, not be 
> silently ignored.
>  
> Hoe do producer/consumer clients handle that situation?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest

2020-07-07 Thread GitBox


vvcephei commented on pull request #8913:
URL: https://github.com/apache/kafka/pull/8913#issuecomment-655019753


   Looks good! 
http://confluent-kafka-branch-builder-system-test-results.s3-us-west-2.amazonaws.com/2020-07-07--001.1594143314--chia7712--KAFKA-10191--4268ab25b/report.html
   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-4273) Streams DSL - Add TTL / retention period support for intermediate topics and state stores

2020-07-07 Thread Jira


[ 
https://issues.apache.org/jira/browse/KAFKA-4273?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152944#comment-17152944
 ] 

Clément MATHIEU commented on KAFKA-4273:


It would be useful. Incidentally, I have several use cases were adding TTL 
support to global table, or support windowed global tables with TTL, would be 
super useful. 

> Streams DSL - Add TTL / retention period support for intermediate topics and 
> state stores
> -
>
> Key: KAFKA-4273
> URL: https://issues.apache.org/jira/browse/KAFKA-4273
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.0.1
>Reporter: Davor Poldrugo
>Priority: Major
>
> Hi!
> I'm using Streams DSL (0.10.0.1), which can only use RocksDB for local state 
> as far as I know - it's not configurable.
> In my use case my data has TTL / retnetion period. It's 48 hours. After that 
> - data can be discarded.
> I join two topics: "messages" and "prices" using windowed inner join.
> The two intermediate Kafka topics for this join are named:
>  * messages-prices-join-this-changelog
>  * messages-prices-join-other-changelog
> Since these topics are created as compacted by Kafka Streams, and I don't 
> wan't to keep data forever, I have altered them to not use compaction. Right 
> now my RocksDB state stores grow indefinitely, and I don't have any options 
> to define TTL, or somehow periodically clean the older data.
> A "hack" that I use to keep my disk usage low - I have schedulled a job to 
> periodically stop Kafka Streams instances - one at the time. This triggers a 
> rebalance, and partitions migrate to other instances. When the instance is 
> started again, there's another rebalance, and sometimes this instance starts 
> processing partitions that wasn't processing before the stop - which leads to 
> deletion of the RocksDB state store for those partitions 
> (state.cleanup.delay.ms). In the next rebalance the local store is recreated 
> with a restore consumer - which reads data from - as previously mentioned - a 
> non compacted topic. And this effectively leads to a "hacked TTL support" in 
> Kafka Streams DSL.
> Questions:
>  * Do you think would be reasonable to add support in the DSL api to define 
> TTL for local store?
>  * Which opens another question - there are use cases which don't need the 
> intermediate topics to be created as "compact". Could also this be added to 
> the DSL api? Maybe only this could be added, and this flag should also be 
> used for the RocksDB TTL. Of course in this case another config would be 
> mandatory - the retention period or TTL for the intermediate topics and the 
> state stores. I saw there is a new cleanup.policy - compact_and_delete - 
> added with KAFKA-4015.
>  * Which also leads to another question, maybe some intermediate topics / 
> state stores need different TTL, so a it's not as simple as that. But after 
> KAFKA-3870, it will be easier.
> RocksDB supports TTL:
>  * 
> https://github.com/apache/kafka/blob/0.10.0.1/streams/src/main/java/org/apache/kafka/streams/state/internals/RocksDBStore.java#L166
>  * https://github.com/facebook/rocksdb/wiki/Time-to-Live
>  * 
> https://github.com/facebook/rocksdb/blob/master/java/src/main/java/org/rocksdb/TtlDB.java
> A somehow similar issue: KAFKA-4212



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mjsax merged pull request #8980: KAFKA-10222:Incorrect methods show up in 0.10 Kafka Streams docs

2020-07-07 Thread GitBox


mjsax merged pull request #8980:
URL: https://github.com/apache/kafka/pull/8980


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10222) Incorrect methods show up in 0.10 Kafka Streams docs

2020-07-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152947#comment-17152947
 ] 

ASF GitHub Bot commented on KAFKA-10222:


huxihx opened a new pull request #272:
URL: https://github.com/apache/kafka-site/pull/272


   https://issues.apache.org/jira/browse/KAFKA-10222
   
   Non-existent methods show up in the doc:
   _builder.from("my-input-topic").mapValue(value -> 
value.length().toString()).to("my-output-topic");_
   
   There is no method named `from` or `mapValues`. They should be `stream` and 
`mapValues` respectively.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Incorrect methods show up in 0.10 Kafka Streams docs
> 
>
> Key: KAFKA-10222
> URL: https://issues.apache.org/jira/browse/KAFKA-10222
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 0.10.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Major
>
> In 0.10 Kafka Streams 
> doc([http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html]),
>  two wrong methods show up, as show below:
> _builder.from("my-input-topic").mapValue(value -> 
> value.length().toString()).to("my-output-topic");_
>  
> There is no method named `from` or `mapValues`. They should be `stream` and 
> `mapValues` respectively.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10222) Incorrect methods show up in 0.10 Kafka Streams docs

2020-07-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152949#comment-17152949
 ] 

ASF GitHub Bot commented on KAFKA-10222:


mjsax merged pull request #272:
URL: https://github.com/apache/kafka-site/pull/272


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Incorrect methods show up in 0.10 Kafka Streams docs
> 
>
> Key: KAFKA-10222
> URL: https://issues.apache.org/jira/browse/KAFKA-10222
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 0.10.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Major
>
> In 0.10 Kafka Streams 
> doc([http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html]),
>  two wrong methods show up, as show below:
> _builder.from("my-input-topic").mapValue(value -> 
> value.length().toString()).to("my-output-topic");_
>  
> There is no method named `from` or `mapValues`. They should be `stream` and 
> `mapValues` respectively.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10222) Incorrect methods show up in 0.10 Kafka Streams docs

2020-07-07 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10222?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17152948#comment-17152948
 ] 

ASF GitHub Bot commented on KAFKA-10222:


mjsax commented on pull request #272:
URL: https://github.com/apache/kafka-site/pull/272#issuecomment-655021969


   We can still merge this PR, because otherwise the web-page would not be 
updated.



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Incorrect methods show up in 0.10 Kafka Streams docs
> 
>
> Key: KAFKA-10222
> URL: https://issues.apache.org/jira/browse/KAFKA-10222
> Project: Kafka
>  Issue Type: Bug
>  Components: documentation
>Affects Versions: 0.10.0.0
>Reporter: huxihx
>Assignee: huxihx
>Priority: Major
>
> In 0.10 Kafka Streams 
> doc([http://kafka.apache.org/0100/javadoc/index.html?org/apache/kafka/streams/KafkaStreams.html]),
>  two wrong methods show up, as show below:
> _builder.from("my-input-topic").mapValue(value -> 
> value.length().toString()).to("my-output-topic");_
>  
> There is no method named `from` or `mapValues`. They should be `stream` and 
> `mapValues` respectively.
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] vvcephei commented on pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest

2020-07-07 Thread GitBox


vvcephei commented on pull request #8913:
URL: https://github.com/apache/kafka/pull/8913#issuecomment-655022914


   Merged. Thanks, @chia7712 !



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei merged pull request #8913: KAFKA-10191 fix flaky StreamsOptimizedTest

2020-07-07 Thread GitBox


vvcephei merged pull request #8913:
URL: https://github.com/apache/kafka/pull/8913


   



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Johnny-Malizia commented on a change in pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion

2020-07-07 Thread GitBox


Johnny-Malizia commented on a change in pull request #8936:
URL: https://github.com/apache/kafka/pull/8936#discussion_r451040852



##
File path: core/src/test/scala/unit/kafka/log/TimeIndexTest.scala
##
@@ -148,5 +148,39 @@ class TimeIndexTest {
 idx.close()
   }
 
-}
 
+  /**
+   * In certain cases, index files fail to have their pre-allocated 0 bytes 
trimmed from the tail
+   * when a new segment is rolled. This causes a silent failure at the next 
startup where all retention
+   * windows are breached purging out data whether or not the window was 
really breached.
+   * KAFKA-10207
+   */
+  @Test
+  def testLoadingUntrimmedIndex(): Unit = {
+// A larger index size must be specified or the starting offset will round 
down
+// preventing this issue from being reproduced. Configs default to 10mb.
+val max1MbEntryCount = 10
+// Create a file that will exist on disk and be removed when we are done
+val file = nonExistantTempFile()
+file.deleteOnExit()
+// create an index that can have up to 10 entries, about 1mb
+var idx2 = new TimeIndex(file, baseOffset = 0, max1MbEntryCount * 12)
+// Append less than the maximum number of entries, leaving 0 bytes padding 
the end
+for (i <- 1 until max1MbEntryCount)
+  idx2.maybeAppend(i, i)
+
+idx2.flush()
+// jvm 1.8.0_191 fails to always flush shrinking resize to zfs disk

Review comment:
   I think that this incorrect behavior only happens with certain jvm 
versions and in my investigation seemed to only be happening on a zfs mount.





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] Johnny-Malizia commented on a change in pull request #8936: KAFKA-10207: Fixed padded timeindex causing premature data deletion

2020-07-07 Thread GitBox


Johnny-Malizia commented on a change in pull request #8936:
URL: https://github.com/apache/kafka/pull/8936#discussion_r451043580



##
File path: core/src/main/scala/kafka/log/LogSegment.scala
##
@@ -87,6 +87,12 @@ class LogSegment private[log] (val log: FileRecords,
   // we will recover the segments above the recovery point in recoverLog()
   // in any case so sanity checking them here is redundant.
   txnIndex.sanityCheck()
+  // Failing to sanity check the timeIndex can result in a scenario where 
log segments are
+  // prematurely deleted (before breaching retention periods) if the index 
file was not resized
+  // to disk successfully.
+  // KAFKA-10207
+  timeIndex.sanityCheck()
+  offsetIndex.sanityCheck()

Review comment:
   Thank you for the feedback here. 
   
   While I agree that the trimming logic should be corrected if possible, it 
seems like an issue like this is out of our hands to some degree. The current 
logic seems to be sound and works on a newer version of the jvm. The issue was 
related to reducing the mmaped file's length not actually happening and failing 
silently.  I'm open to suggestions to work around this, but even working around 
the issue for this specific jvm and this specific version of zfs it seems 
plausible this same issue could crop up again in another jvm or another storage 
driver whereas checking at read time catches this particularly nasty issue.
   
   I think that a good compromise here would be to follow through as you and 
@dhruvilshah3 have suggested and apply the sanity check when the index is first 
loaded as the check itself is cheap enough and we would avoid loading *every* 
segment at startup.
   





This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on pull request #8902: KAFKA-10179: Pass correct changelog topic to state serdes

2020-07-07 Thread GitBox


vvcephei commented on pull request #8902:
URL: https://github.com/apache/kafka/pull/8902#issuecomment-655026893


   ```
   00:05:04.438 > Task :streams:compileTestJava
   00:05:04.438 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13@2/streams/src/test/java/org/apache/kafka/test/NoOpProcessorContext.java:62:
 error: cannot find symbol
   00:05:04.438 protected StateManager stateManager() {
   00:05:04.439   ^
   00:05:04.439   symbol:   class StateManager
   00:05:04.439   location: class NoOpProcessorContext
   00:05:04.439 
/home/jenkins/jenkins-slave/workspace/kafka-pr-jdk11-scala2.13@2/streams/src/test/java/org/apache/kafka/test/InternalMockProcessorContext.java:198:
 error: cannot find symbol
   00:05:04.439 protected StateManager stateManager() {
   00:05:04.439   ^
   00:05:04.440   symbol:   class StateManager
   00:05:04.440   location: class InternalMockProcessorContext
   00:05:13.935 2 errors
   ```



This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




  1   2   >