[jira] [Updated] (KAFKA-13358) Not able to replicate groups in MirrorMaker 2.0

2021-10-06 Thread Hemanth Savasere (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hemanth Savasere updated KAFKA-13358:
-
Priority: Critical  (was: Major)

> Not able to replicate groups in MirrorMaker 2.0 
> 
>
> Key: KAFKA-13358
> URL: https://issues.apache.org/jira/browse/KAFKA-13358
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
> Environment: RHEL
>Reporter: Hemanth Savasere
>Priority: Critical
> Attachments: connect-mirror-maker.properties
>
>
> I had created a *group PizzaGroup from kafka-console-consumer* so that I can 
> read the content of the *Pizza* topic in the *source* cluster. 
> Now I want to replicate the *same group to the destination* cluster.
> Even though I have left the *groups.blacklist as empty* in the 
> *connect-mirror-maker.properties*, even then the groups are not getting 
> replicated. The properties file has been attached for reference.
> We are using SSL protocol.
> *The topics and their offsets are getting replicated but the groups are not 
> getting replicated.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13358) Not able to replicate groups in MirrorMaker 2.0

2021-10-06 Thread Hemanth Savasere (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hemanth Savasere updated KAFKA-13358:
-
Description: 
I had created a *group PizzaGroup from kafka-console-consumer* so that I can 
read the content of the *Pizza* topic in the *source* cluster. 

Now I want to replicate the *same group to the destination* cluster.

Even though I have left the *groups.blacklist as empty* in the 
*connect-mirror-maker.properties*, even then the groups are not getting 
replicated. The properties file has been attached for reference.

We are using SSL protocol.

The topics and their offsets are getting replicated but the groups are not 
getting replicated.



  was:
I had created a *group PizzaGroup from kafka-console-consumer* so that I can 
read the content of the *Pizza* topic in the *source* cluster. 

Now I want to replicate the *same group to the destination* cluster.

Even though I have left the *groups.blacklist as empty* in the 
*connect-mirror-maker.properties*, even then the groups are not getting 
replicated. The properties file has been attached for reference.

We are using SSL protocol.




> Not able to replicate groups in MirrorMaker 2.0 
> 
>
> Key: KAFKA-13358
> URL: https://issues.apache.org/jira/browse/KAFKA-13358
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
> Environment: RHEL
>Reporter: Hemanth Savasere
>Priority: Major
> Attachments: connect-mirror-maker.properties
>
>
> I had created a *group PizzaGroup from kafka-console-consumer* so that I can 
> read the content of the *Pizza* topic in the *source* cluster. 
> Now I want to replicate the *same group to the destination* cluster.
> Even though I have left the *groups.blacklist as empty* in the 
> *connect-mirror-maker.properties*, even then the groups are not getting 
> replicated. The properties file has been attached for reference.
> We are using SSL protocol.
> The topics and their offsets are getting replicated but the groups are not 
> getting replicated.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13358) Not able to replicate groups in MirrorMaker 2.0

2021-10-06 Thread Hemanth Savasere (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hemanth Savasere updated KAFKA-13358:
-
Description: 
I had created a *group PizzaGroup from kafka-console-consumer* so that I can 
read the content of the *Pizza* topic in the *source* cluster. 

Now I want to replicate the *same group to the destination* cluster.

Even though I have left the *groups.blacklist as empty* in the 
*connect-mirror-maker.properties*, even then the groups are not getting 
replicated. The properties file has been attached for reference.

We are using SSL protocol.

*The topics and their offsets are getting replicated but the groups are not 
getting replicated.
*


  was:
I had created a *group PizzaGroup from kafka-console-consumer* so that I can 
read the content of the *Pizza* topic in the *source* cluster. 

Now I want to replicate the *same group to the destination* cluster.

Even though I have left the *groups.blacklist as empty* in the 
*connect-mirror-maker.properties*, even then the groups are not getting 
replicated. The properties file has been attached for reference.

We are using SSL protocol.

The topics and their offsets are getting replicated but the groups are not 
getting replicated.




> Not able to replicate groups in MirrorMaker 2.0 
> 
>
> Key: KAFKA-13358
> URL: https://issues.apache.org/jira/browse/KAFKA-13358
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
> Environment: RHEL
>Reporter: Hemanth Savasere
>Priority: Major
> Attachments: connect-mirror-maker.properties
>
>
> I had created a *group PizzaGroup from kafka-console-consumer* so that I can 
> read the content of the *Pizza* topic in the *source* cluster. 
> Now I want to replicate the *same group to the destination* cluster.
> Even though I have left the *groups.blacklist as empty* in the 
> *connect-mirror-maker.properties*, even then the groups are not getting 
> replicated. The properties file has been attached for reference.
> We are using SSL protocol.
> *The topics and their offsets are getting replicated but the groups are not 
> getting replicated.
> *



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13358) Not able to replicate groups in MirrorMaker 2.0

2021-10-06 Thread Hemanth Savasere (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hemanth Savasere updated KAFKA-13358:
-
Description: 
I had created a *group PizzaGroup from kafka-console-consumer* so that I can 
read the content of the *Pizza* topic in the *source* cluster. 

Now I want to replicate the *same group to the destination* cluster.

Even though I have left the *groups.blacklist as empty* in the 
*connect-mirror-maker.properties*, even then the groups are not getting 
replicated. The properties file has been attached for reference.

We are using SSL protocol.

*The topics and their offsets are getting replicated but the groups are not 
getting replicated.*


  was:
I had created a *group PizzaGroup from kafka-console-consumer* so that I can 
read the content of the *Pizza* topic in the *source* cluster. 

Now I want to replicate the *same group to the destination* cluster.

Even though I have left the *groups.blacklist as empty* in the 
*connect-mirror-maker.properties*, even then the groups are not getting 
replicated. The properties file has been attached for reference.

We are using SSL protocol.

*The topics and their offsets are getting replicated but the groups are not 
getting replicated.
*



> Not able to replicate groups in MirrorMaker 2.0 
> 
>
> Key: KAFKA-13358
> URL: https://issues.apache.org/jira/browse/KAFKA-13358
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
> Environment: RHEL
>Reporter: Hemanth Savasere
>Priority: Major
> Attachments: connect-mirror-maker.properties
>
>
> I had created a *group PizzaGroup from kafka-console-consumer* so that I can 
> read the content of the *Pizza* topic in the *source* cluster. 
> Now I want to replicate the *same group to the destination* cluster.
> Even though I have left the *groups.blacklist as empty* in the 
> *connect-mirror-maker.properties*, even then the groups are not getting 
> replicated. The properties file has been attached for reference.
> We are using SSL protocol.
> *The topics and their offsets are getting replicated but the groups are not 
> getting replicated.*



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13358) Not able to replicate groups in MirrorMaker 2.0

2021-10-06 Thread Hemanth Savasere (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13358?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hemanth Savasere updated KAFKA-13358:
-
Description: 
I had created a *group PizzaGroup from kafka-console-consumer* so that I can 
read the content of the *Pizza* topic in the *source* cluster. 

Now I want to replicate the *same group to the destination* cluster.

Even though I have left the *groups.blacklist as empty* in the 
*connect-mirror-maker.properties*, even then the groups are not getting 
replicated. The properties file has been attached for reference.

We are using SSL protocol.



  was:
I had created a *group PizzaGroup from kafka-console-consumer *so that I can 
read the content of the *Pizza* topic in the *source* cluster. 

Now I want to replicate the *same group to the destination* cluster.

Even though I have left the *groups.blacklist as empty* in the 
*connect-mirror-maker.properties*, even then the groups are not getting 
replicated. The properties file has been attached for reference.

We are using SSL protocol.




> Not able to replicate groups in MirrorMaker 2.0 
> 
>
> Key: KAFKA-13358
> URL: https://issues.apache.org/jira/browse/KAFKA-13358
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
> Environment: RHEL
>Reporter: Hemanth Savasere
>Priority: Major
> Attachments: connect-mirror-maker.properties
>
>
> I had created a *group PizzaGroup from kafka-console-consumer* so that I can 
> read the content of the *Pizza* topic in the *source* cluster. 
> Now I want to replicate the *same group to the destination* cluster.
> Even though I have left the *groups.blacklist as empty* in the 
> *connect-mirror-maker.properties*, even then the groups are not getting 
> replicated. The properties file has been attached for reference.
> We are using SSL protocol.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13358) Not able to replicate groups in MirrorMaker 2.0

2021-10-06 Thread Hemanth Savasere (Jira)
Hemanth Savasere created KAFKA-13358:


 Summary: Not able to replicate groups in MirrorMaker 2.0 
 Key: KAFKA-13358
 URL: https://issues.apache.org/jira/browse/KAFKA-13358
 Project: Kafka
  Issue Type: Bug
  Components: mirrormaker
Affects Versions: 2.5.0
 Environment: RHEL
Reporter: Hemanth Savasere
 Attachments: connect-mirror-maker.properties

I had created a *group PizzaGroup from kafka-console-consumer *so that I can 
read the content of the *Pizza* topic in the *source* cluster. 

Now I want to replicate the *same group to the destination* cluster.

Even though I have left the *groups.blacklist as empty* in the 
*connect-mirror-maker.properties*, even then the groups are not getting 
replicated. The properties file has been attached for reference.

We are using SSL protocol.





--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9613) CorruptRecordException: Found record size 0 smaller than minimum record overhead

2021-10-06 Thread Nandini (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9613?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Nandini updated KAFKA-9613:
---
Summary: CorruptRecordException: Found record size 0 smaller than minimum 
record overhead  (was: orruptRecordException: Found record size 0 smaller than 
minimum record overhead)

> CorruptRecordException: Found record size 0 smaller than minimum record 
> overhead
> 
>
> Key: KAFKA-9613
> URL: https://issues.apache.org/jira/browse/KAFKA-9613
> Project: Kafka
>  Issue Type: Bug
>Reporter: Amit Khandelwal
>Priority: Major
>
> 20200224;21:01:38: [2020-02-24 21:01:38,615] ERROR [ReplicaManager broker=0] 
> Error processing fetch with max size 1048576 from consumer on partition 
> SANDBOX.BROKER.NEWORDER-0: (fetchOffset=211886, logStartOffset=-1, 
> maxBytes=1048576, currentLeaderEpoch=Optional.empty) 
> (kafka.server.ReplicaManager)
> 20200224;21:01:38: org.apache.kafka.common.errors.CorruptRecordException: 
> Found record size 0 smaller than minimum record overhead (14) in file 
> /data/tmp/kafka-topic-logs/SANDBOX.BROKER.NEWORDER-0/.log.
> 20200224;21:05:48: [2020-02-24 21:05:48,711] INFO [GroupMetadataManager 
> brokerId=0] Removed 0 expired offsets in 1 milliseconds. 
> (kafka.coordinator.group.GroupMetadataManager)
> 20200224;21:10:22: [2020-02-24 21:10:22,204] INFO [GroupCoordinator 0]: 
> Member 
> _011-9e61d2c9-ce5a-4231-bda1-f04e6c260dc0-StreamThread-1-consumer-27768816-ee87-498f-8896-191912282d4f
>  in group y_011 has failed, removing it from the group 
> (kafka.coordinator.group.GroupCoordinator)
>  
> [https://stackoverflow.com/questions/60404510/kafka-broker-issue-replica-manager-with-max-size#]
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] hojongs commented on a change in pull request #11249: Fix wrong link to "Guarantees" part in introduction.html documentation

2021-10-06 Thread GitBox


hojongs commented on a change in pull request #11249:
URL: https://github.com/apache/kafka/pull/11249#discussion_r723837977



##
File path: docs/introduction.html
##
@@ -144,13 +144,13 @@ 
 
   
   
-Producers are those client applications that publish 
(write) events to Kafka, and consumers are those that 
subscribe to (read and process) these events. In Kafka, producers and consumers 
are fully decoupled and agnostic of each other, which is a key design element 
to achieve the high scalability that Kafka is known for. For example, producers 
never need to wait for consumers. Kafka provides various guarantees such as the ability to 
process events exactly-once.
+Producers are those client applications that publish 
(write) events to Kafka, and consumers are those that 
subscribe to (read and process) these events. In Kafka, producers and consumers 
are fully decoupled and agnostic of each other, which is a key design element 
to achieve the high scalability that Kafka is known for. For example, producers 
never need to wait for consumers. Kafka provides various guarantees such as the ability to 
process events exactly-once.

Review comment:
   You're right. That's more make sense.
   
   Applied at f017014 
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] hojongs commented on a change in pull request #11249: Fix wrong link to "Guarantees" part in introduction.html documentation

2021-10-06 Thread GitBox


hojongs commented on a change in pull request #11249:
URL: https://github.com/apache/kafka/pull/11249#discussion_r723837977



##
File path: docs/introduction.html
##
@@ -144,13 +144,13 @@ 
 
   
   
-Producers are those client applications that publish 
(write) events to Kafka, and consumers are those that 
subscribe to (read and process) these events. In Kafka, producers and consumers 
are fully decoupled and agnostic of each other, which is a key design element 
to achieve the high scalability that Kafka is known for. For example, producers 
never need to wait for consumers. Kafka provides various guarantees such as the ability to 
process events exactly-once.
+Producers are those client applications that publish 
(write) events to Kafka, and consumers are those that 
subscribe to (read and process) these events. In Kafka, producers and consumers 
are fully decoupled and agnostic of each other, which is a key design element 
to achieve the high scalability that Kafka is known for. For example, producers 
never need to wait for consumers. Kafka provides various guarantees such as the ability to 
process events exactly-once.

Review comment:
   You're right. That's more make sense.
   




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13356) Use "delete" retention policy only for stream-stream join windowed stores

2021-10-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax resolved KAFKA-13356.
-
Resolution: Duplicate

> Use "delete" retention policy only for stream-stream join windowed stores
> -
>
> Key: KAFKA-13356
> URL: https://issues.apache.org/jira/browse/KAFKA-13356
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>Priority: Major
>
> Today stream-stream join associated window stores, like any other window 
> stores, use "delete,compact" as their retention policies. However, since 
> today we add sequence number to disable de-duplication of keys, "compaction" 
> would never be able to compact any keys, but only result in 1) CPU waste on 
> the cleaner thread on brokers, 2) some additional feature of brokers that 
> relies on "delete" policy to not be able to apply.
> Until we change the store format potentially in the future to not use 
> sequence number for disable de-duping, we could consider just changing the 
> policy to "delete" for stream-stream join's window store for now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] C0urante commented on pull request #10563: KAFKA-12487: Add support for cooperative consumer protocol with sink connectors

2021-10-06 Thread GitBox


C0urante commented on pull request #10563:
URL: https://github.com/apache/kafka/pull/10563#issuecomment-937395625


   @kkonstantine Would you mind giving this another pass? It's been over two 
months.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10528: KAFKA-12497: Skip unnecessary source task offset commits

2021-10-06 Thread GitBox


C0urante commented on pull request #10528:
URL: https://github.com/apache/kafka/pull/10528#issuecomment-937395176


   @rhauch @tombentley could either of you take a look? It'd be nice to get 
this merged in time for the upcoming 3.1 release; I know I've seen plenty of 
people led astray by continued offset commit messages for failed tasks and it'd 
be great if we could improve their experience.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #10907: KAFKA-10000: Exactly-once support for source connectors (KIP-618)

2021-10-06 Thread GitBox


C0urante commented on pull request #10907:
URL: https://github.com/apache/kafka/pull/10907#issuecomment-937394815


   @rhauch Is that second pass coming any time soon? It'd be nice if we could 
get this merged in time for the upcoming 3.1 release.
   
   I plan to address the existing comments next week.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11332: MINOR: re-add removed test coverage for 'KAFKA-12983: reset needsJoinPrepare flag'

2021-10-06 Thread GitBox


ableegoldman commented on pull request #11332:
URL: https://github.com/apache/kafka/pull/11332#issuecomment-937385978


   Merged to trunk and cherrypicked to 3.0 & 2.8


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman merged pull request #11332: MINOR: re-add removed test coverage for 'KAFKA-12983: reset needsJoinPrepare flag'

2021-10-06 Thread GitBox


ableegoldman merged pull request #11332:
URL: https://github.com/apache/kafka/pull/11332


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] ableegoldman commented on pull request #11332: MINOR: re-add removed test coverage for 'KAFKA-12983: reset needsJoinPrepare flag'

2021-10-06 Thread GitBox


ableegoldman commented on pull request #11332:
URL: https://github.com/apache/kafka/pull/11332#issuecomment-937381455


   Failures are unrelated:
   
   `kafka.admin.LeaderElectionCommandTest.testElectionResultOutput()`
   
`kafka.api.PlaintextAdminIntegrationTest.testReplicaCanFetchFromLogStartOffsetAfterDeleteRecords()`
   
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante commented on pull request #11046: KAFKA-12980: Return empty record batch from Consumer::poll when position advances due to aborted transactions

2021-10-06 Thread GitBox


C0urante commented on pull request #11046:
URL: https://github.com/apache/kafka/pull/11046#issuecomment-937378866


   @hachikuji could you take a look at this when you have a chance?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax commented on pull request #11370: MINOR: remove unneeded size and add lock coarsening to inMemoryKeyValueStore

2021-10-06 Thread GitBox


mjsax commented on pull request #11370:
URL: https://github.com/apache/kafka/pull/11370#issuecomment-937373122


   Thanks for the PR @showuon! Merged to `trunk`.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] mjsax merged pull request #11370: MINOR: remove unneeded size and add lock coarsening to inMemoryKeyValueStore

2021-10-06 Thread GitBox


mjsax merged pull request #11370:
URL: https://github.com/apache/kafka/pull/11370


   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13357) Controller snapshot contains producer ids records but broker does not

2021-10-06 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13357:
--

 Summary: Controller snapshot contains producer ids records but 
broker does not
 Key: KAFKA-13357
 URL: https://issues.apache.org/jira/browse/KAFKA-13357
 Project: Kafka
  Issue Type: Sub-task
  Components: kraft
Affects Versions: 3.0.0
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio


MetadataDelta ignores PRODUCER_IDS_RECORDS. A broker doesn't need this state 
for its operation. The broker needs to handle this records if we want to hold 
the invariant that controllers snapshots are equivalent to broker snapshots.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] kowshik commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke

2021-10-06 Thread GitBox


kowshik commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r723625488



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/CommittedOffsetsFile.java
##
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.utils.Utils;
+import org.apache.kafka.server.common.CheckpointFile;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.regex.Pattern;
+
+/**
+ * This class represents a file containing the committed offsets of remote log 
metadata partitions.
+ */
+public class CommittedOffsetsFile {
+private static final int CURRENT_VERSION = 0;
+private static final String SEPARATOR = " ";
+
+private static final Pattern MINIMUM_ONE_WHITESPACE = 
Pattern.compile("\\s+");
+private final CheckpointFile> checkpointFile;
+
+CommittedOffsetsFile(File offsetsFile) throws IOException {
+CheckpointFile.EntryFormatter> formatter = 
new EntryFormatter();
+checkpointFile = new CheckpointFile<>(offsetsFile, CURRENT_VERSION, 
formatter);
+}
+
+private static class EntryFormatter implements 
CheckpointFile.EntryFormatter> {
+
+@Override
+public String toString(Map.Entry entry) {
+// Each entry is stored in a new line as 
+return entry.getKey() + SEPARATOR + entry.getValue();
+}
+
+@Override
+public Optional> fromString(String line) {
+String[] strings = MINIMUM_ONE_WHITESPACE.split(line);

Review comment:
   It seems performance optimization is not a concern here, since 
deserialization of committed offsets file from disk happens only during 
initialization.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-7996) KafkaStreams does not pass timeout when closing Producer

2021-10-06 Thread Manjunath Shettar (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17425153#comment-17425153
 ] 

Manjunath Shettar commented on KAFKA-7996:
--

Hello,

Our Kafka streams application sometimes gets into a hung state preventing the 
main process from exiting. I stumbled upon this ticket while trying to root 
cause the issue. It looks like what we are seeing is similar to whats described 
above and also in 
[https://github.com/apache/kafka/pull/7814|https://github.com/apache/kafka/pull/7814.]

We are currently running on kafka 2.4.1. It appears that streams.close() api 
implementation has changed quite a bit in the latest kafka releases, especially 
2.8 onwards. However, a closer look at the {{close()}} api implementation of 
[StreamThread.java|https://github.com/apache/kafka/blob/2.8.0/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java#L1064]
 reveals that close api of producer, consumer, etc can still block the stream 
thread and hence block the streams app from closing gracefully.
 # Could someone confirm if this issue still exists or if it has been fixed ?
 ## If yes, could you please let us know the version containing the 
corresponding fix.
 ## If not, is there a plan to check in on this issue in the near future ?

 

thanks,
Manjunath

> KafkaStreams does not pass timeout when closing Producer
> 
>
> Key: KAFKA-7996
> URL: https://issues.apache.org/jira/browse/KAFKA-7996
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0
>Reporter: Patrik Kleindl
>Priority: Major
>  Labels: needs-kip
>
> KIP WIP: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-459%3A+Improve+KafkaStreams%23close]
> [https://confluentcommunity.slack.com/messages/C48AHTCUQ/convo/C48AHTCUQ-1550831721.026100/]
> We are running 2.1 and have a case where the shutdown of a streams 
> application takes several minutes
>  I noticed that although we call streams.close with a timeout of 30 seconds 
> the log says
>  [Producer 
> clientId=…-8be49feb-8a2e-4088-bdd7-3c197f6107bb-StreamThread-1-producer] 
> Closing the Kafka producer with timeoutMillis = 9223372036854775807 ms.
> Matthias J Sax [vor 3 Tagen]
>  I just checked the code, and yes, we don't provide a timeout for the 
> producer on close()...



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-13356) Use "delete" retention policy only for stream-stream join windowed stores

2021-10-06 Thread Matthias J. Sax (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13356?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Matthias J. Sax reassigned KAFKA-13356:
---

Assignee: Matthias J. Sax

> Use "delete" retention policy only for stream-stream join windowed stores
> -
>
> Key: KAFKA-13356
> URL: https://issues.apache.org/jira/browse/KAFKA-13356
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Guozhang Wang
>Assignee: Matthias J. Sax
>Priority: Major
>
> Today stream-stream join associated window stores, like any other window 
> stores, use "delete,compact" as their retention policies. However, since 
> today we add sequence number to disable de-duplication of keys, "compaction" 
> would never be able to compact any keys, but only result in 1) CPU waste on 
> the cleaner thread on brokers, 2) some additional feature of brokers that 
> relies on "delete" policy to not be able to apply.
> Until we change the store format potentially in the future to not use 
> sequence number for disable de-duping, we could consider just changing the 
> policy to "delete" for stream-stream join's window store for now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13356) Use "delete" retention policy only for stream-stream join windowed stores

2021-10-06 Thread Guozhang Wang (Jira)
Guozhang Wang created KAFKA-13356:
-

 Summary: Use "delete" retention policy only for stream-stream join 
windowed stores
 Key: KAFKA-13356
 URL: https://issues.apache.org/jira/browse/KAFKA-13356
 Project: Kafka
  Issue Type: Improvement
  Components: streams
Reporter: Guozhang Wang


Today stream-stream join associated window stores, like any other window 
stores, use "delete,compact" as their retention policies. However, since today 
we add sequence number to disable de-duplication of keys, "compaction" would 
never be able to compact any keys, but only result in 1) CPU waste on the 
cleaner thread on brokers, 2) some additional feature of brokers that relies on 
"delete" policy to not be able to apply.

Until we change the store format potentially in the future to not use sequence 
number for disable de-duping, we could consider just changing the policy to 
"delete" for stream-stream join's window store for now.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke

2021-10-06 Thread GitBox


satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r723456248



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##
@@ -82,44 +87,148 @@
 // User topic partitions that this broker is a leader/follower for.
 private Set assignedTopicPartitions = 
Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets.
+// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
+// may or may not have been processed based on the assigned topic 
partitions.
 private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
 
+// Map of remote log metadata topic partition to processed offsets. 
Received consumer record is
+// processed as the remote log metadata record's topic partition exists in 
assigned topic partitions.
+private final Map partitionToProcessedOffsets = new 
ConcurrentHashMap<>();
+
+// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
+private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+
+private final long committedOffsetSyncIntervalMs;
+private CommittedOffsetsFile committedOffsetsFile;
+private long lastSyncedTimeMs;
+
 public ConsumerTask(KafkaConsumer consumer,
 RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
-RemoteLogMetadataTopicPartitioner topicPartitioner) {
-Objects.requireNonNull(consumer);
-Objects.requireNonNull(remotePartitionMetadataEventHandler);
-Objects.requireNonNull(topicPartitioner);
-
-this.consumer = consumer;
-this.remotePartitionMetadataEventHandler = 
remotePartitionMetadataEventHandler;
-this.topicPartitioner = topicPartitioner;
+RemoteLogMetadataTopicPartitioner topicPartitioner,
+Path committedOffsetsPath,
+Time time,
+long committedOffsetSyncIntervalMs) {
+this.consumer = Objects.requireNonNull(consumer);
+this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
+this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
+this.time = Objects.requireNonNull(time);
+this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
+
+initializeConsumerAssignment(committedOffsetsPath);
+}
+
+private void initializeConsumerAssignment(Path committedOffsetsPath) {
+try {
+committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
+} catch (IOException e) {
+throw new KafkaException(e);
+}
+
+Map committedOffsets = Collections.emptyMap();
+try {
+// Load committed offset and assign them in the consumer.
+committedOffsets = committedOffsetsFile.readEntries();
+} catch (IOException e) {
+// Ignore the error and consumer consumes from the earliest offset.
+log.error("Encountered error while building committed offsets from 
the file", e);
+}
+
+if (!committedOffsets.isEmpty()) {
+// Assign topic partitions from the earlier committed offsets file.
+Set earlierAssignedPartitions = committedOffsets.keySet();
+assignedMetaPartitions = 
Collections.unmodifiableSet(earlierAssignedPartitions);
+Set metadataTopicPartitions = 
earlierAssignedPartitions.stream()
+   
.map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
+   
.collect(Collectors.toSet());
+consumer.assign(metadataTopicPartitions);
+
+// Seek to the committed offsets
+for (Map.Entry entry : committedOffsets.entrySet()) 
{
+partitionToConsumedOffsets.put(entry.getKey(), 
entry.getValue());
+partitionToProcessedOffsets.put(entry.getKey(), 
entry.getValue());
+consumer.seek(new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), 
entry.getValue());
+}
+
+lastSyncedPartitionToConsumedOffsets = 
Collections.unmodifiableMap(committedOffsets);
+}
 }
 
 @Override
 public void run() {
 log.info("Started Consumer task thread.");
+lastSyncedTimeMs = time.milliseconds();
 try {
 while (!closing) {
 maybeWaitForPartitionsAssignment();
 
 log.info("Polling consumer to receive remote log metadata 
topic records");
-ConsumerRecords consumerRecords
-= con

[jira] [Created] (KAFKA-13355) Shutdown broker eventually when unrecoverable exceptions like IOException encountered in RLMM.

2021-10-06 Thread Satish Duggana (Jira)
Satish Duggana created KAFKA-13355:
--

 Summary: Shutdown broker eventually when unrecoverable exceptions 
like IOException encountered in RLMM. 
 Key: KAFKA-13355
 URL: https://issues.apache.org/jira/browse/KAFKA-13355
 Project: Kafka
  Issue Type: Sub-task
Reporter: Satish Duggana
Assignee: Satish Duggana


Have mechanism to catch unrecoverable exceptions like IOException from RLMM and 
shutdown the broker like it is done in log layer. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-13354) Topic metrics count request rate inconsistently with other request metrics

2021-10-06 Thread David Mao (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13354?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

David Mao updated KAFKA-13354:
--
Description: 
The request rate metrics in BrokerTopicMetrics are incremented per partition in 
a Produce request. If a produce requests has multiple partitions for the same 
topic in the request, then the request will get counted multiple times.

This is inconsistent with how we count request rate metrics elsewhere.

The same applies to the TotalFetchRequest rate metric

  was:
The request rate metrics in BrokerTopicMetrics are incremented per partition in 
a Produce request. If a produce requests has multiple partitions for the same 
topic in the request, then the request will get counted multiple times.

This is inconsistent with how we count request rate metrics elsewhere.

The same applies to the TotalFetchRequest rate


> Topic metrics count request rate inconsistently with other request metrics
> --
>
> Key: KAFKA-13354
> URL: https://issues.apache.org/jira/browse/KAFKA-13354
> Project: Kafka
>  Issue Type: Bug
>  Components: core, metrics
>Reporter: David Mao
>Priority: Minor
>
> The request rate metrics in BrokerTopicMetrics are incremented per partition 
> in a Produce request. If a produce requests has multiple partitions for the 
> same topic in the request, then the request will get counted multiple times.
> This is inconsistent with how we count request rate metrics elsewhere.
> The same applies to the TotalFetchRequest rate metric



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13354) Topic metrics count request rate inconsistently with other request metrics

2021-10-06 Thread David Mao (Jira)
David Mao created KAFKA-13354:
-

 Summary: Topic metrics count request rate inconsistently with 
other request metrics
 Key: KAFKA-13354
 URL: https://issues.apache.org/jira/browse/KAFKA-13354
 Project: Kafka
  Issue Type: Bug
  Components: core, metrics
Reporter: David Mao


The request rate metrics in BrokerTopicMetrics are incremented per partition in 
a Produce request. If a produce requests has multiple partitions for the same 
topic in the request, then the request will get counted multiple times.

This is inconsistent with how we count request rate metrics elsewhere.

The same applies to the TotalFetchRequest rate



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] iakunin edited a comment on pull request #10775: KAFKA-12668: Making MockScheduler.schedule safe to use in concurrent code

2021-10-06 Thread GitBox


iakunin edited a comment on pull request #10775:
URL: https://github.com/apache/kafka/pull/10775#issuecomment-936453432


   @chia7712 @huxihx @jsancio hey guys! Any updates on this pull request? 
   
   Maybe you have some time to have a look and give me any feedback?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] iakunin commented on pull request #10775: KAFKA-12668: Making MockScheduler.schedule safe to use in concurrent code

2021-10-06 Thread GitBox


iakunin commented on pull request #10775:
URL: https://github.com/apache/kafka/pull/10775#issuecomment-936453432


   @chia7712 @huxihx @jsancio hey guys! Any updates on this pull request? Maybe 
you have some time to have a look?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] C0urante opened a new pull request #11384: MINOR: Improve error message for scale mismatch in Connect logical Decimal types

2021-10-06 Thread GitBox


C0urante opened a new pull request #11384:
URL: https://github.com/apache/kafka/pull/11384


   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13353) Device or resource busy error while deleting topic

2021-10-06 Thread Albert Lozano (Jira)
Albert Lozano created KAFKA-13353:
-

 Summary: Device or resource busy error while deleting topic 
 Key: KAFKA-13353
 URL: https://issues.apache.org/jira/browse/KAFKA-13353
 Project: Kafka
  Issue Type: Bug
  Components: core
Affects Versions: 2.5.0
Reporter: Albert Lozano
 Attachments: NoSuchFileException.txt, ResourceBusyError.txt

We are getting the errors _"Device or resource busy"_ and 
_"java.nio.file.NoSuchFileException"_ while deleting a topic.

The brokers are running in k8s and using NFS 4.1 as storage.

Due to this error, the brokers are restarting.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on a change in pull request #11220: KAFKA-10777: Add additional configuration to control MirrorMaker 2 internal topics naming convention

2021-10-06 Thread GitBox


mimaison commented on a change in pull request #11220:
URL: https://github.com/apache/kafka/pull/11220#discussion_r723328617



##
File path: 
connect/mirror-client/src/main/java/org/apache/kafka/connect/mirror/ReplicationPolicy.java
##
@@ -52,9 +52,39 @@ default String originalTopic(String topic) {
 }
 }
 
+/** Returns heartbeats topic name.*/
+default String heartbeatsTopic() {
+return "heartbeats";
+}
+
+/** Returns the offset-syncs topic for given cluster alias. */
+default String offsetSyncTopic(String clusterAlias) {
+return "mm2-offset-syncs." + clusterAlias + ".internal";
+}
+
+/** Returns the name checkpoint topic for given cluster alias. */
+default String checkpointTopic(String clusterAlias) {
+return clusterAlias + ".checkpoint.internal";

Review comment:
   Can you also send an update to the VOTE thread? so all participants are 
notified. Thanks




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broker restarts.

2021-10-06 Thread GitBox


satishd commented on pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#issuecomment-935967523


   Thanks @kowshik for the review comments. Added inline replies and addressed 
with the latest commit. 


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13352) Kafka Client does not support passwords starting with number in jaas config

2021-10-06 Thread Vyacheslav Boyko (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vyacheslav Boyko updated KAFKA-13352:
-
Component/s: clients

> Kafka Client does not support passwords starting with number in jaas config
> ---
>
> Key: KAFKA-13352
> URL: https://issues.apache.org/jira/browse/KAFKA-13352
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.7.1
>Reporter: Vyacheslav Boyko
>Priority: Trivial
>
> I'm trying to connect to Kafka with Apache Camel's component.
> I have SASL JAAS CONFIG param as:
> {code:java}
> "org.apache.kafka.common.security.plain.PlainLoginModule required 
> username=pf_kafka_card-products password=8GMf0yWkLHrI4cNYYoyHGxclkXCLSCGJ;" 
> {code}
> And I faced an issue during my application starts:
> {code:java}
> Caused by: java.lang.IllegalArgumentException: Value not specified for key 
> 'password' in JAAS config {code}
> I have tried to inspect this issue. I prepared a block of code to reproduce 
> it (Original code is in JaasConfig.java in kafka-clients-...jar). Here it is:
> {code:java}
> public static void main(String[] args) {
> String test = "org.apache.kafka.common.security.plain.PlainLoginModule 
> required username=pf_kafka_card-products 
> password=8GMf0yWkLHrI4cNYYoyHGxclkXCLSCGJ;";
> testJaasConfig(test);
> //SpringApplication.run(CardApplication.class, args);
> }
> private static void testJaasConfig(String config) {
> StreamTokenizer tokenizer = new StreamTokenizer(new StringReader(config));
> tokenizer.slashSlashComments(true);
> tokenizer.slashStarComments(true);
> tokenizer.wordChars('-', '-');
> tokenizer.wordChars('_', '_');
> tokenizer.wordChars('$', '$');
> tokenizer.wordChars('0', '9');
> List configEntries;
> try {
> configEntries = new ArrayList<>();
> while (tokenizer.nextToken() != StreamTokenizer.TT_EOF) {
> configEntries.add(parseAppConfigurationEntry(tokenizer));
> }
> if (configEntries.isEmpty())
> throw new IllegalArgumentException("Login module not specified in 
> JAAS config");
> } catch (IOException e) {
> throw new KafkaException("Unexpected exception while parsing JAAS 
> config");
> }
> }
> private static AppConfigurationEntry 
> parseAppConfigurationEntry(StreamTokenizer tokenizer) throws IOException {
> String loginModule = tokenizer.sval;
> if (tokenizer.nextToken() == StreamTokenizer.TT_EOF)
> throw new IllegalArgumentException("Login module control flag not 
> specified in JAAS config");
> AppConfigurationEntry.LoginModuleControlFlag controlFlag = 
> loginModuleControlFlag(tokenizer.sval);
> Map options = new HashMap<>();
> while (tokenizer.nextToken() != StreamTokenizer.TT_EOF && tokenizer.ttype 
> != ';') {
> String key = tokenizer.sval;
> if (tokenizer.nextToken() != '=' || tokenizer.nextToken() == 
> StreamTokenizer.TT_EOF || tokenizer.sval == null)
> throw new IllegalArgumentException("Value not specified for key 
> '" + key + "' in JAAS config");
> String value = tokenizer.sval;
> options.put(key, value);
> }
> if (tokenizer.ttype != ';')
> throw new IllegalArgumentException("JAAS config entry not terminated 
> by semi-colon");
> return new AppConfigurationEntry(loginModule, controlFlag, options);
> }
> private static AppConfigurationEntry.LoginModuleControlFlag 
> loginModuleControlFlag(String flag) {
> if (flag == null)
> throw new IllegalArgumentException("Login module control flag is not 
> available in the JAAS config");
> AppConfigurationEntry.LoginModuleControlFlag controlFlag;
> switch (flag.toUpperCase(Locale.ROOT)) {
> case "REQUIRED":
> controlFlag = 
> AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
> break;
> case "REQUISITE":
> controlFlag = 
> AppConfigurationEntry.LoginModuleControlFlag.REQUISITE;
> break;
> case "SUFFICIENT":
> controlFlag = 
> AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT;
> break;
> case "OPTIONAL":
> controlFlag = 
> AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL;
> break;
> default:
> throw new IllegalArgumentException("Invalid login module control 
> flag '" + flag + "' in JAAS config");
> }
> return controlFlag;
> }
>  {code}
> I have solved this issue by changing my password from
> {code:java}
> 8GMf0yWkLHrI4cNYYoyHGxclkXCLSCGJ {code}
> to
> {code:java}
> aaa {code}
> This leads me to suggestion that Tokenizer interprets any leading digit as 
> 'bad' symbol and it breaks to parse the whole line.



--
This message w

[jira] [Updated] (KAFKA-13352) Kafka Client does not support passwords starting with number in jaas config

2021-10-06 Thread Vyacheslav Boyko (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vyacheslav Boyko updated KAFKA-13352:
-
Description: 
I'm trying to connect to Kafka with Apache Camel's component.

I have SASL JAAS CONFIG param as:
{code:java}
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=pf_kafka_card-products password=8GMf0yWkLHrI4cNYYoyHGxclkXCLSCGJ;" 
{code}
And I faced an issue during my application starts:
{code:java}
Caused by: java.lang.IllegalArgumentException: Value not specified for key 
'password' in JAAS config {code}
I have tried to inspect this issue. I prepared a block of code to reproduce it 
(Original code is in JaasConfig.java in kafka-clients-...jar). Here it is:
{code:java}
public static void main(String[] args) {
String test = "org.apache.kafka.common.security.plain.PlainLoginModule 
required username=pf_kafka_card-products 
password=8GMf0yWkLHrI4cNYYoyHGxclkXCLSCGJ;";
testJaasConfig(test);

//SpringApplication.run(CardApplication.class, args);
}

private static void testJaasConfig(String config) {

StreamTokenizer tokenizer = new StreamTokenizer(new StringReader(config));
tokenizer.slashSlashComments(true);
tokenizer.slashStarComments(true);
tokenizer.wordChars('-', '-');
tokenizer.wordChars('_', '_');
tokenizer.wordChars('$', '$');
tokenizer.wordChars('0', '9');

List configEntries;

try {
configEntries = new ArrayList<>();
while (tokenizer.nextToken() != StreamTokenizer.TT_EOF) {
configEntries.add(parseAppConfigurationEntry(tokenizer));
}
if (configEntries.isEmpty())
throw new IllegalArgumentException("Login module not specified in 
JAAS config");

} catch (IOException e) {
throw new KafkaException("Unexpected exception while parsing JAAS 
config");
}

}

private static AppConfigurationEntry parseAppConfigurationEntry(StreamTokenizer 
tokenizer) throws IOException {
String loginModule = tokenizer.sval;
if (tokenizer.nextToken() == StreamTokenizer.TT_EOF)
throw new IllegalArgumentException("Login module control flag not 
specified in JAAS config");
AppConfigurationEntry.LoginModuleControlFlag controlFlag = 
loginModuleControlFlag(tokenizer.sval);
Map options = new HashMap<>();
while (tokenizer.nextToken() != StreamTokenizer.TT_EOF && tokenizer.ttype 
!= ';') {
String key = tokenizer.sval;
if (tokenizer.nextToken() != '=' || tokenizer.nextToken() == 
StreamTokenizer.TT_EOF || tokenizer.sval == null)
throw new IllegalArgumentException("Value not specified for key '" 
+ key + "' in JAAS config");
String value = tokenizer.sval;
options.put(key, value);
}
if (tokenizer.ttype != ';')
throw new IllegalArgumentException("JAAS config entry not terminated by 
semi-colon");
return new AppConfigurationEntry(loginModule, controlFlag, options);
}

private static AppConfigurationEntry.LoginModuleControlFlag 
loginModuleControlFlag(String flag) {
if (flag == null)
throw new IllegalArgumentException("Login module control flag is not 
available in the JAAS config");

AppConfigurationEntry.LoginModuleControlFlag controlFlag;
switch (flag.toUpperCase(Locale.ROOT)) {
case "REQUIRED":
controlFlag = AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
break;
case "REQUISITE":
controlFlag = 
AppConfigurationEntry.LoginModuleControlFlag.REQUISITE;
break;
case "SUFFICIENT":
controlFlag = 
AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT;
break;
case "OPTIONAL":
controlFlag = AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL;
break;
default:
throw new IllegalArgumentException("Invalid login module control 
flag '" + flag + "' in JAAS config");
}
return controlFlag;
}
 {code}
I have solved this issue by changing my password from
{code:java}
8GMf0yWkLHrI4cNYYoyHGxclkXCLSCGJ {code}
to
{code:java}
aaa {code}
This leads me to suggestion that Tokenizer interprets any leading digit as 
'bad' symbol and it breaks to parse the whole line.

  was:
I'm trying to connect to Kafka with Apache Camel's component.

I have SASL JAAS CONFIG param as:
{code:java}
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=pf_kafka_card-products password=8GMf0yWkLHrI4cNYYoyHGxclkXCLSCGJ;" 
{code}
And I faced an issue during my application starts:
{code:java}
Caused by: java.lang.IllegalArgumentException: Value not specified for key 
'password' in JAAS config {code}
I have tried to inspect this issue. I prepared a block of code to reproduce it 
(Original code is in JaasConfig.java in kafka-client-*.*.jar). Here it is:
{code:java}
public static void main(String[] args) {
String test = "org.apache.kafk

[jira] [Updated] (KAFKA-13352) Kafka Client does not support passwords starting with number in jaas config

2021-10-06 Thread Vyacheslav Boyko (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13352?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vyacheslav Boyko updated KAFKA-13352:
-
Description: 
I'm trying to connect to Kafka with Apache Camel's component.

I have SASL JAAS CONFIG param as:
{code:java}
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=pf_kafka_card-products password=8GMf0yWkLHrI4cNYYoyHGxclkXCLSCGJ;" 
{code}
And I faced an issue during my application starts:
{code:java}
Caused by: java.lang.IllegalArgumentException: Value not specified for key 
'password' in JAAS config {code}
I have tried to inspect this issue. I prepared a block of code to reproduce it 
(Original code is in JaasConfig.java in kafka-client-*.*.jar). Here it is:
{code:java}
public static void main(String[] args) {
String test = "org.apache.kafka.common.security.plain.PlainLoginModule 
required username=pf_kafka_card-products 
password=8GMf0yWkLHrI4cNYYoyHGxclkXCLSCGJ;";
testJaasConfig(test);

//SpringApplication.run(CardApplication.class, args);
}

private static void testJaasConfig(String config) {

StreamTokenizer tokenizer = new StreamTokenizer(new StringReader(config));
tokenizer.slashSlashComments(true);
tokenizer.slashStarComments(true);
tokenizer.wordChars('-', '-');
tokenizer.wordChars('_', '_');
tokenizer.wordChars('$', '$');
tokenizer.wordChars('0', '9');

List configEntries;

try {
configEntries = new ArrayList<>();
while (tokenizer.nextToken() != StreamTokenizer.TT_EOF) {
configEntries.add(parseAppConfigurationEntry(tokenizer));
}
if (configEntries.isEmpty())
throw new IllegalArgumentException("Login module not specified in 
JAAS config");

} catch (IOException e) {
throw new KafkaException("Unexpected exception while parsing JAAS 
config");
}

}

private static AppConfigurationEntry parseAppConfigurationEntry(StreamTokenizer 
tokenizer) throws IOException {
String loginModule = tokenizer.sval;
if (tokenizer.nextToken() == StreamTokenizer.TT_EOF)
throw new IllegalArgumentException("Login module control flag not 
specified in JAAS config");
AppConfigurationEntry.LoginModuleControlFlag controlFlag = 
loginModuleControlFlag(tokenizer.sval);
Map options = new HashMap<>();
while (tokenizer.nextToken() != StreamTokenizer.TT_EOF && tokenizer.ttype 
!= ';') {
String key = tokenizer.sval;
if (tokenizer.nextToken() != '=' || tokenizer.nextToken() == 
StreamTokenizer.TT_EOF || tokenizer.sval == null)
throw new IllegalArgumentException("Value not specified for key '" 
+ key + "' in JAAS config");
String value = tokenizer.sval;
options.put(key, value);
}
if (tokenizer.ttype != ';')
throw new IllegalArgumentException("JAAS config entry not terminated by 
semi-colon");
return new AppConfigurationEntry(loginModule, controlFlag, options);
}

private static AppConfigurationEntry.LoginModuleControlFlag 
loginModuleControlFlag(String flag) {
if (flag == null)
throw new IllegalArgumentException("Login module control flag is not 
available in the JAAS config");

AppConfigurationEntry.LoginModuleControlFlag controlFlag;
switch (flag.toUpperCase(Locale.ROOT)) {
case "REQUIRED":
controlFlag = AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
break;
case "REQUISITE":
controlFlag = 
AppConfigurationEntry.LoginModuleControlFlag.REQUISITE;
break;
case "SUFFICIENT":
controlFlag = 
AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT;
break;
case "OPTIONAL":
controlFlag = AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL;
break;
default:
throw new IllegalArgumentException("Invalid login module control 
flag '" + flag + "' in JAAS config");
}
return controlFlag;
}
 {code}
I have solved this issue by changing my password from
{code:java}
8GMf0yWkLHrI4cNYYoyHGxclkXCLSCGJ {code}
to
{code:java}
aaa {code}
This leads me to suggestion that Tokenizer interprets any leading digit as 
'bad' symbol and it breaks to parse the whole line.

  was:
I'm trying to connect to Kafka with Apache Camel's component.

I have SASL JAAS CONFIG param as:
{code:java}
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=pf_kafka_card-products password=8GMf0yWkLHrI4cNYYoyHGxclkXCLSCGJ;" 
{code}
And I faced an issue during my application starts:
{code:java}
Caused by: java.lang.IllegalArgumentException: Value not specified for key 
'password' in JAAS config {code}
I have tried to inspect this issue. I prepared a block of code to reproduce it. 
Here it is:
{code:java}
public static void main(String[] args) {
String test = "org.apache.kafka.common.security.plain.PlainLoginModule 
required username=pf

[jira] [Created] (KAFKA-13352) Kafka Client does not support passwords starting with number in jaas config

2021-10-06 Thread Vyacheslav Boyko (Jira)
Vyacheslav Boyko created KAFKA-13352:


 Summary: Kafka Client does not support passwords starting with 
number in jaas config
 Key: KAFKA-13352
 URL: https://issues.apache.org/jira/browse/KAFKA-13352
 Project: Kafka
  Issue Type: Bug
Affects Versions: 2.7.1
Reporter: Vyacheslav Boyko


I'm trying to connect to Kafka with Apache Camel's component.

I have SASL JAAS CONFIG param as:
{code:java}
"org.apache.kafka.common.security.plain.PlainLoginModule required 
username=pf_kafka_card-products password=8GMf0yWkLHrI4cNYYoyHGxclkXCLSCGJ;" 
{code}
And I faced an issue during my application starts:
{code:java}
Caused by: java.lang.IllegalArgumentException: Value not specified for key 
'password' in JAAS config {code}
I have tried to inspect this issue. I prepared a block of code to reproduce it. 
Here it is:
{code:java}
public static void main(String[] args) {
String test = "org.apache.kafka.common.security.plain.PlainLoginModule 
required username=pf_kafka_card-products 
password=8GMf0yWkLHrI4cNYYoyHGxclkXCLSCGJ;";
testJaasConfig(test);

//SpringApplication.run(CardApplication.class, args);
}

private static void testJaasConfig(String config) {

StreamTokenizer tokenizer = new StreamTokenizer(new StringReader(config));
tokenizer.slashSlashComments(true);
tokenizer.slashStarComments(true);
tokenizer.wordChars('-', '-');
tokenizer.wordChars('_', '_');
tokenizer.wordChars('$', '$');
tokenizer.wordChars('0', '9');

List configEntries;

try {
configEntries = new ArrayList<>();
while (tokenizer.nextToken() != StreamTokenizer.TT_EOF) {
configEntries.add(parseAppConfigurationEntry(tokenizer));
}
if (configEntries.isEmpty())
throw new IllegalArgumentException("Login module not specified in 
JAAS config");

} catch (IOException e) {
throw new KafkaException("Unexpected exception while parsing JAAS 
config");
}

}

private static AppConfigurationEntry parseAppConfigurationEntry(StreamTokenizer 
tokenizer) throws IOException {
String loginModule = tokenizer.sval;
if (tokenizer.nextToken() == StreamTokenizer.TT_EOF)
throw new IllegalArgumentException("Login module control flag not 
specified in JAAS config");
AppConfigurationEntry.LoginModuleControlFlag controlFlag = 
loginModuleControlFlag(tokenizer.sval);
Map options = new HashMap<>();
while (tokenizer.nextToken() != StreamTokenizer.TT_EOF && tokenizer.ttype 
!= ';') {
String key = tokenizer.sval;
if (tokenizer.nextToken() != '=' || tokenizer.nextToken() == 
StreamTokenizer.TT_EOF || tokenizer.sval == null)
throw new IllegalArgumentException("Value not specified for key '" 
+ key + "' in JAAS config");
String value = tokenizer.sval;
options.put(key, value);
}
if (tokenizer.ttype != ';')
throw new IllegalArgumentException("JAAS config entry not terminated by 
semi-colon");
return new AppConfigurationEntry(loginModule, controlFlag, options);
}

private static AppConfigurationEntry.LoginModuleControlFlag 
loginModuleControlFlag(String flag) {
if (flag == null)
throw new IllegalArgumentException("Login module control flag is not 
available in the JAAS config");

AppConfigurationEntry.LoginModuleControlFlag controlFlag;
switch (flag.toUpperCase(Locale.ROOT)) {
case "REQUIRED":
controlFlag = AppConfigurationEntry.LoginModuleControlFlag.REQUIRED;
break;
case "REQUISITE":
controlFlag = 
AppConfigurationEntry.LoginModuleControlFlag.REQUISITE;
break;
case "SUFFICIENT":
controlFlag = 
AppConfigurationEntry.LoginModuleControlFlag.SUFFICIENT;
break;
case "OPTIONAL":
controlFlag = AppConfigurationEntry.LoginModuleControlFlag.OPTIONAL;
break;
default:
throw new IllegalArgumentException("Invalid login module control 
flag '" + flag + "' in JAAS config");
}
return controlFlag;
}
 {code}
I have solved this issue by changing my password from
{code:java}
8GMf0yWkLHrI4cNYYoyHGxclkXCLSCGJ {code}
to
{code:java}
aaa {code}
This leads me to suggestion that Tokenizer interprets any leading digit as 
'bad' symbol and it breaks to parse the whole line.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9106) metrics exposed via JMX shoud be configurable

2021-10-06 Thread Andras Katona (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17424893#comment-17424893
 ] 

Andras Katona commented on KAFKA-9106:
--

Fix Version was set to 2.5.0 but it made to 2.6.0 as earliest released kafka 
version.

> metrics exposed via JMX shoud be configurable
> -
>
> Key: KAFKA-9106
> URL: https://issues.apache.org/jira/browse/KAFKA-9106
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Reporter: Xavier Léauté
>Assignee: Xavier Léauté
>Priority: Major
> Fix For: 2.6.0
>
>
> Kafka exposes a very large number of metrics, all of which are always visible 
> in JMX by default. On large clusters with many partitions, this may result in 
> tens of thousands of mbeans to be registered, which can lead to timeouts with 
> some popular monitoring agents that rely on listing JMX metrics via RMI.
> Making the set of JMX-visible metrics configurable would allow operators to 
> decide on the set of critical metrics to collect and workaround limitation of 
> JMX in those cases.
> corresponding KIP
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9106) metrics exposed via JMX shoud be configurable

2021-10-06 Thread Andras Katona (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9106?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Andras Katona updated KAFKA-9106:
-
Fix Version/s: (was: 2.5.0)
   2.6.0

> metrics exposed via JMX shoud be configurable
> -
>
> Key: KAFKA-9106
> URL: https://issues.apache.org/jira/browse/KAFKA-9106
> Project: Kafka
>  Issue Type: Improvement
>  Components: metrics
>Reporter: Xavier Léauté
>Assignee: Xavier Léauté
>Priority: Major
> Fix For: 2.6.0
>
>
> Kafka exposes a very large number of metrics, all of which are always visible 
> in JMX by default. On large clusters with many partitions, this may result in 
> tens of thousands of mbeans to be registered, which can lead to timeouts with 
> some popular monitoring agents that rely on listing JMX metrics via RMI.
> Making the set of JMX-visible metrics configurable would allow operators to 
> decide on the set of critical metrics to collect and workaround limitation of 
> JMX in those cases.
> corresponding KIP
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-544%3A+Make+metrics+exposed+via+JMX+configurable



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] mimaison commented on a change in pull request #11249: Fix wrong link to "Guarantees" part in introduction.html documentation

2021-10-06 Thread GitBox


mimaison commented on a change in pull request #11249:
URL: https://github.com/apache/kafka/pull/11249#discussion_r723050185



##
File path: docs/introduction.html
##
@@ -144,13 +144,13 @@ 
 
   
   
-Producers are those client applications that publish 
(write) events to Kafka, and consumers are those that 
subscribe to (read and process) these events. In Kafka, producers and consumers 
are fully decoupled and agnostic of each other, which is a key design element 
to achieve the high scalability that Kafka is known for. For example, producers 
never need to wait for consumers. Kafka provides various guarantees such as the ability to 
process events exactly-once.
+Producers are those client applications that publish 
(write) events to Kafka, and consumers are those that 
subscribe to (read and process) these events. In Kafka, producers and consumers 
are fully decoupled and agnostic of each other, which is a key design element 
to achieve the high scalability that Kafka is known for. For example, producers 
never need to wait for consumers. Kafka provides various guarantees such as the ability to 
process events exactly-once.

Review comment:
   I wonder if pointing to 
https://kafka.apache.org/documentation/#semantics would actually make more 
sense. WDYT?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13346) Kafka Streams fails due to RocksDB Locks Not Available Exception

2021-10-06 Thread Bruno Cadonna (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13346?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17424845#comment-17424845
 ] 

Bruno Cadonna commented on KAFKA-13346:
---

[~ableegoldman] "org.rocksdb.RocksDBException: No locks available" could be a 
consequence of "Too many open files". Let's first see if the situation improves 
by increasing the open files limit.

> Kafka Streams fails due to RocksDB Locks Not Available Exception
> 
>
> Key: KAFKA-13346
> URL: https://issues.apache.org/jira/browse/KAFKA-13346
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Amit Gupta
>Priority: Major
>
> Hello,
> We are using Kafka Streams and we observe that some times on some of the 
> hosts running streams application, Kafka streams instance fails with 
> unexpected exception. We are running with 40 stream threads per host and 20 
> hosts in total.
> Can some one please help on what can be the root cause here?
>  
> |org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
> state-store at location .
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:214)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:224)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$init$0(MeteredKeyValueStore.java:101)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl.maybeMeasureLatency(StreamsMetricsImpl.java:836)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.init(MeteredKeyValueStore.java:101)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.ProcessorStateManager.registerStateStores(ProcessorStateManager.java:199)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StateManagerUtil.registerStateStores(StateManagerUtil.java:76)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StandbyTask.initializeIfNeeded(StandbyTask.java:95)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.TaskManager.tryToCompleteRestoration(TaskManager.java:426)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:660)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:551)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:510)
>  ~[kafka-streams-2.6.0.jar:?]
>  Caused by: org.rocksdb.RocksDBException: lock : 
> ./0_468/rocksdb/state-store/LOCK: No locks available
>  at org.rocksdb.RocksDB.open(Native Method) ~[rocksdbjni-5.18.3.jar:?]
>  at org.rocksdb.RocksDB.open(RocksDB.java:286) ~[rocksdbjni-5.18.3.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:211)
>  ~[kafka-streams-2.6.0.jar:?]
>  ... 15 more
>   
>  Some times I also see this exception
>   |
> |org.apache.kafka.streams.errors.ProcessorStateException: Error opening store 
> state-store at location ./0_433/rocksdb/state-store
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openRocksDB(RocksDBStore.java:214)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.openDB(RocksDBStore.java:188)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.RocksDBStore.init(RocksDBStore.java:224)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.init(ChangeLoggingKeyValueBytesStore.java:42)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.WrappedStateStore.init(WrappedStateStore.java:48)
>  ~[kafka-streams-2.6.0.jar:?]
>  at 
> org.apache.kafka.streams.state.internals.MeteredKe

[jira] [Resolved] (KAFKA-7214) Mystic FATAL error

2021-10-06 Thread Seweryn Habdank-Wojewodzki (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki resolved KAFKA-7214.
---
Resolution: Workaround

The solution is to avoid low values of {{max.block.ms}}

> Mystic FATAL error
> --
>
> Key: KAFKA-7214
> URL: https://issues.apache.org/jira/browse/KAFKA-7214
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.1.1, 2.3.0, 2.2.1
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
> Attachments: qns-1.1.zip, qns-1.zip
>
>
> Dears,
> Very often at startup of the streaming application I got exception:
> {code}
> Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=my_instance_medium_topic, partition=1, offset=198900203; 
> [org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:212),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:347),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:420),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:339),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:648),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:459)]
>  in thread 
> my_application-my_instance-my_instance_medium-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62
> {code}
> and then (without shutdown request from my side):
> {code}
> 2018-07-30 07:45:02 [ar313] [INFO ] StreamThread:912 - stream-thread 
> [my_application-my_instance-my_instance-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62]
>  State transition from PENDING_SHUTDOWN to DEAD.
> {code}
> What is this?
> How to correctly handle it?
> Thanks in advance for help.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-7214) Mystic FATAL error

2021-10-06 Thread Seweryn Habdank-Wojewodzki (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-7214?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki closed KAFKA-7214.
-

> Mystic FATAL error
> --
>
> Key: KAFKA-7214
> URL: https://issues.apache.org/jira/browse/KAFKA-7214
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 0.11.0.3, 1.1.1, 2.3.0, 2.2.1
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
> Attachments: qns-1.1.zip, qns-1.zip
>
>
> Dears,
> Very often at startup of the streaming application I got exception:
> {code}
> Exception caught in process. taskId=0_1, processor=KSTREAM-SOURCE-00, 
> topic=my_instance_medium_topic, partition=1, offset=198900203; 
> [org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:212),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks$2.apply(AssignedTasks.java:347),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:420),
>  
> org.apache.kafka.streams.processor.internals.AssignedTasks.process(AssignedTasks.java:339),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.processAndPunctuate(StreamThread.java:648),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:513),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:482),
>  
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:459)]
>  in thread 
> my_application-my_instance-my_instance_medium-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62
> {code}
> and then (without shutdown request from my side):
> {code}
> 2018-07-30 07:45:02 [ar313] [INFO ] StreamThread:912 - stream-thread 
> [my_application-my_instance-my_instance-72ee1819-edeb-4d85-9d65-f67f7c321618-StreamThread-62]
>  State transition from PENDING_SHUTDOWN to DEAD.
> {code}
> What is this?
> How to correctly handle it?
> Thanks in advance for help.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-6777) Wrong reaction on Out Of Memory situation

2021-10-06 Thread Seweryn Habdank-Wojewodzki (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6777?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki closed KAFKA-6777.
-

> Wrong reaction on Out Of Memory situation
> -
>
> Key: KAFKA-6777
> URL: https://issues.apache.org/jira/browse/KAFKA-6777
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 1.0.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Critical
> Attachments: screenshot-1.png
>
>
> Dears,
> We already encountered many times problems related to Out Of Memory situation 
> in Kafka Broker and streaming clients.
> The scenario is the following.
> When Kafka Broker (or Streaming Client) is under load and has too less 
> memory, there are no errors in server logs. One can see some cryptic entries 
> in GC logs, but they are definitely not self-explaining.
> Kafka Broker (and Streaming Clients) works further. Later we see in JMX 
> monitoring, that JVM uses more and more time in GC. In our case it grows from 
> e.g. 1% to 80%-90% of CPU time is used by GC.
> Next, software collapses into zombie mode – process in not ending. In such a 
> case I would expect, that process is crashing (e.g. got SIG SEGV). Even worse 
> Kafka treats such a zombie process normal and somewhat sends messages, which 
> are in fact getting lost, also the cluster is not excluding broken nodes. The 
> question is how to configure Kafka to really terminate the JVM and not remain 
> in zombie mode, to give a chance to other nodes to know, that something is 
> dead.
> I would expect that in Out Of Memory situation JVM is ended if not graceful 
> than at least process is crashed.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-6882) Wrong producer settings may lead to DoS on Kafka Server

2021-10-06 Thread Seweryn Habdank-Wojewodzki (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-6882?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki closed KAFKA-6882.
-

> Wrong producer settings may lead to DoS on Kafka Server
> ---
>
> Key: KAFKA-6882
> URL: https://issues.apache.org/jira/browse/KAFKA-6882
> Project: Kafka
>  Issue Type: Bug
>  Components: core, producer 
>Affects Versions: 1.0.1, 1.1.0
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> The documentation of the following parameters “linger.ms” and “batch.size” is 
> a bit confusing. In fact those parameters wrongly set on the producer side 
> might completely destroy BROKER throughput.
> I see, that smart developers are reading documentation of those parameters.
> Then they want to have super performance and super safety, so they set 
> something like this below:
> {code}
> kafkaProps.put(ProducerConfig.LINGER_MS_CONFIG, 1);
> kafkaProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 0);
> {code}
> Then we have situation, when each and every message is send separately. 
> TCP/IP protocol is really busy in that case and when they needed high 
> throughput they got much less throughput, as every message is goes separately 
> causing all network communication and TCP/IP overhead significant.
> Those settings are good only if someone sends critical messages like once a 
> while (e.g. one message per minute) and not when throughput is important by 
> sending thousands messages per second.
> Situation is even worse when smart developers are reading, that for safety, 
> they need acknowledges from all cluster nodes. So they are adding:
> {code}
> kafkaProps.put(ProducerConfig.ACKS_CONFIG, "all");
> {code}
> And this is the end of Kafka performance! 
> Even worse it is not a problem for the Kafka producer. The problem remains at 
> the server (cluster, broker) side. The server is so busy by acknowledging 
> *each and every* message from all nodes, that other work is NOT performed, so 
> the end to end performance is almost none.
> I would like to ask you to improve documentation of this parameters.
> And consider corner cases is case of providing detailed information how 
> extreme values of parameters - namely lowest and highest – may influence work 
> of the cluster.
> This was documentation issue. 
> On the other hand it is security/safety matter.
> Technically the problem is that __commit_offsets topic is loaded with 
> enormous amount of messages. It leads to the situation, when Kafka Broker is 
> exposed to *DoS *due to the Producer settings. Three lines of code a bit load 
> and the Kafka cluster is dead.
> I suppose there are ways to prevent such a situation on the cluster side, but 
> it require some logic to be implemented to detect such a simple but efficient 
> DoS.
> BTW. Do Kafka Admin Tools provide any kind of "kill" connection, when one or 
> the other producer makes problems?



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-8548) Inconsistency in Kafka Documentation

2021-10-06 Thread Seweryn Habdank-Wojewodzki (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8548?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki closed KAFKA-8548.
-

> Inconsistency in Kafka Documentation
> 
>
> Key: KAFKA-8548
> URL: https://issues.apache.org/jira/browse/KAFKA-8548
> Project: Kafka
>  Issue Type: Task
>  Components: documentation
>Affects Versions: 2.2.1
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Minor
>
> Dears,
> Two parts (referenced below) of [documentation 
> |http://kafka.apache.org/documentation/] are not quite consistent.
> In one text we can read, that max.poll.interval.ms has defaut value 
> Integer.MAX_VALUE, in the other it is 300 000.
> Part 1.
> {quote}
> The default values for two configurations of the StreamsConfig class were 
> changed to improve the resiliency of Kafka Streams applications. The internal 
> Kafka Streams producer retries default value was changed from 0 to 10. The 
> internal Kafka Streams consumer max.poll.interval.ms default value was 
> changed from 30 to {color:#FF}Integer.MAX_VALUE{color}.
> {quote}
>  
> Part 2. - Table
> |max.poll.interval.ms|The maximum delay between invocations of poll() when 
> using consumer group management. This places an upper bound on the amount of 
> time that the consumer can be idle before fetching more records. If poll() is 
> not called before expiration of this timeout, then the consumer is considered 
> failed and the group will rebalance in order to reassign the partitions to 
> another member.|int|{color:#FF}30{color}|[1,...]|medium|
> Which value is then default :-)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Closed] (KAFKA-9221) Kafka REST Proxy wrongly converts quotes in message when sending json

2021-10-06 Thread Seweryn Habdank-Wojewodzki (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9221?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Seweryn Habdank-Wojewodzki closed KAFKA-9221.
-

> Kafka REST Proxy wrongly converts quotes in message when sending json
> -
>
> Key: KAFKA-9221
> URL: https://issues.apache.org/jira/browse/KAFKA-9221
> Project: Kafka
>  Issue Type: Bug
>  Components: tools
>Affects Versions: 2.3.0
> Environment: Linux redhat
>Reporter: Seweryn Habdank-Wojewodzki
>Priority: Major
>
> Kafka REST Proxy has a problem when sending/converting json files (e.g. 
> json.new) into Kafka protocol. For example JSON file:
> {code:java}
> {"records":[{"value":"rest.kafka.testmetric,host=server.com,partition=8,topic=my_topic,url=http:--localhost:7071-metrics
>  1337 1572276922"}]}
> {code}
> is sent using call to Kafka REST Proxy on localhost:8073:
> {code:java}
> curl -X POST -H "Content-Type: application/vnd.kafka.json.v2+json" -H 
> "Accept: application/vnd.kafka.v2+json" --data @json.new  
> http://localhost:8073/topics/somple_topic -i 
> {code}
> in Kafka in some_topic we got:
> {code:java}
> "rest.kafka.testmetric,host=server.com,partition=8,topic=my_topic,url=http:--localhost:7071-metrics
>  1337 1572276922"
> {code}
> but expected is that message has no quotes:
> {code:java}
> rest.kafka.testmetric,host=server.com,partition=8,topic=my_topic,url=http:--localhost:7071-metrics
>  1337 1572276922
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-13351) Add possibility to write kafka headers in Kafka Console Producer

2021-10-06 Thread Seweryn Habdank-Wojewodzki (Jira)
Seweryn Habdank-Wojewodzki created KAFKA-13351:
--

 Summary: Add possibility to write kafka headers in Kafka Console 
Producer
 Key: KAFKA-13351
 URL: https://issues.apache.org/jira/browse/KAFKA-13351
 Project: Kafka
  Issue Type: Wish
Affects Versions: 2.8.1
Reporter: Seweryn Habdank-Wojewodzki


Dears,

Currently there is an asymetry between Kafka Console Consumer and Producer.
Kafka Consumer can display headers (KAFKA-6733), but Kafka Producer cannot 
produce them.

It would be good to unify this and add possibility to Kafka Console Producer to 
produce them.

Similar ticket is here: KAFKA-6574, but it is very old and does not represents 
current state of the software.

Please consider this.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke

2021-10-06 Thread GitBox


satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r722958956



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
##
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Utils;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the remote log data snapshot stored in a file for a 
specific topic partition. This is used by
+ * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata 
received for a specific partition from
+ * remote log metadata topic. This will avoid reading the remote log metadata 
messages from the topic again when a
+ * broker restarts.
+ */
+public class RemoteLogMetadataSnapshotFile {
+private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
+
+public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = 
"remote_log_snapshot";
+
+// File format:
+// [...]
+// header: 

+// entry: 
+
+// header size: 2 (version) + 4 (partition num) + 8 (offset) = 14
+private static final int HEADER_SIZE = 14;
+
+private final File metadataStoreFile;
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+/**
+ * Creates a CommittedLogMetadataSnapshotFile instance backed by a file 
with the name `remote_log_snapshot` in
+ * the given {@code metadataStoreDir}. It creates the file if it does not 
exist.
+ *
+ * @param metadataStoreDir directory in which the snapshot file to be 
created.
+ */
+RemoteLogMetadataSnapshotFile(Path metadataStoreDir) {
+this.metadataStoreFile = new File(metadataStoreDir.toFile(), 
COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME);
+
+// Create an empty file if it does not exist.
+try {
+boolean newFileCreated = metadataStoreFile.createNewFile();
+log.info("Remote log metadata snapshot file: [{}], newFileCreated: 
[{}]", metadataStoreFile, newFileCreated);
+} catch (IOException e) {
+throw new KafkaException(e);
+}
+}
+
+/**
+ * Writes the given snapshot replacing the earlier snapshot data.
+ *
+ * @param snapshot Snapshot to be stored.
+ * @throws IOException if there4 is any error in writing the given 
snapshot to the file.
+ */
+public synchronized void write(Snapshot snapshot) throws IOException {
+Path newMetadataSnapshotFilePath = new 
File(metadataStoreFile.getAbsolutePath() + ".tmp").toPath();
+try (FileChannel fileChannel = 
FileChannel.open(newMetadataSnapshotFilePath,
+
StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
+
+// header: 

+ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+
+// Write version
+headerBuffer.putShort(snapshot.version());
+
+// Write metadata partition and metadata partition offset
+headerBuffer.putInt(snapshot.metadataPartition());
+
+// Write metadata partition offset
+headerBuffer.putLong(snapshot.metadataPartitionOffset());
+headerBuffer.flip();
+
+// Write header
+fileChannel.write(headerBuffer);
+
+// Write each entry
+ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+f

[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke

2021-10-06 Thread GitBox


satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r722958575



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/RemoteLogMetadataSnapshotFile.java
##
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.log.remote.metadata.storage;
+
+import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Utils;
+import 
org.apache.kafka.server.log.remote.metadata.storage.serialization.RemoteLogMetadataSerde;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.Channels;
+import java.nio.channels.FileChannel;
+import java.nio.channels.ReadableByteChannel;
+import java.nio.file.Path;
+import java.nio.file.StandardOpenOption;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+
+/**
+ * This class represents the remote log data snapshot stored in a file for a 
specific topic partition. This is used by
+ * {@link TopicBasedRemoteLogMetadataManager} to store the remote log metadata 
received for a specific partition from
+ * remote log metadata topic. This will avoid reading the remote log metadata 
messages from the topic again when a
+ * broker restarts.
+ */
+public class RemoteLogMetadataSnapshotFile {
+private static final Logger log = 
LoggerFactory.getLogger(RemoteLogMetadataSnapshotFile.class);
+
+public static final String COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME = 
"remote_log_snapshot";
+
+// File format:
+// [...]
+// header: 

+// entry: 
+
+// header size: 2 (version) + 4 (partition num) + 8 (offset) = 14
+private static final int HEADER_SIZE = 14;
+
+private final File metadataStoreFile;
+private final RemoteLogMetadataSerde serde = new RemoteLogMetadataSerde();
+
+/**
+ * Creates a CommittedLogMetadataSnapshotFile instance backed by a file 
with the name `remote_log_snapshot` in
+ * the given {@code metadataStoreDir}. It creates the file if it does not 
exist.
+ *
+ * @param metadataStoreDir directory in which the snapshot file to be 
created.
+ */
+RemoteLogMetadataSnapshotFile(Path metadataStoreDir) {
+this.metadataStoreFile = new File(metadataStoreDir.toFile(), 
COMMITTED_LOG_METADATA_SNAPSHOT_FILE_NAME);
+
+// Create an empty file if it does not exist.
+try {
+boolean newFileCreated = metadataStoreFile.createNewFile();
+log.info("Remote log metadata snapshot file: [{}], newFileCreated: 
[{}]", metadataStoreFile, newFileCreated);
+} catch (IOException e) {
+throw new KafkaException(e);
+}
+}
+
+/**
+ * Writes the given snapshot replacing the earlier snapshot data.
+ *
+ * @param snapshot Snapshot to be stored.
+ * @throws IOException if there4 is any error in writing the given 
snapshot to the file.
+ */
+public synchronized void write(Snapshot snapshot) throws IOException {
+Path newMetadataSnapshotFilePath = new 
File(metadataStoreFile.getAbsolutePath() + ".tmp").toPath();
+try (FileChannel fileChannel = 
FileChannel.open(newMetadataSnapshotFilePath,
+
StandardOpenOption.CREATE, StandardOpenOption.READ, StandardOpenOption.WRITE)) {
+
+// header: 

+ByteBuffer headerBuffer = ByteBuffer.allocate(HEADER_SIZE);
+
+// Write version
+headerBuffer.putShort(snapshot.version());
+
+// Write metadata partition and metadata partition offset
+headerBuffer.putInt(snapshot.metadataPartition());
+
+// Write metadata partition offset
+headerBuffer.putLong(snapshot.metadataPartitionOffset());
+headerBuffer.flip();
+
+// Write header
+fileChannel.write(headerBuffer);
+
+// Write each entry
+ByteBuffer lenBuffer = ByteBuffer.allocate(4);
+f

[GitHub] [kafka] showuon commented on a change in pull request #11380: KAFKA-13345: Use "delete" cleanup policy for windowed stores if duplicates are enabled

2021-10-06 Thread GitBox


showuon commented on a change in pull request #11380:
URL: https://github.com/apache/kafka/pull/11380#discussion_r722946931



##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/JoinStoreIntegrationTest.java
##
@@ -131,4 +143,50 @@ public void 
providingAJoinStoreNameShouldNotMakeTheJoinResultQueriable() throws
 );
 }
 }
+
+@Test
+public void 
streamJoinChangelogTopicShouldBeConfiguredWithDeleteOnlyCleanupPolicy() throws 
Exception {
+STREAMS_CONFIG.put(StreamsConfig.APPLICATION_ID_CONFIG, APP_ID + 
"-changelog-cleanup-policy");
+final StreamsBuilder builder = new StreamsBuilder();
+
+final KStream left = builder.stream(INPUT_TOPIC_LEFT, 
Consumed.with(Serdes.String(), Serdes.Integer()));
+final KStream right = 
builder.stream(INPUT_TOPIC_RIGHT, Consumed.with(Serdes.String(), 
Serdes.Integer()));
+final CountDownLatch latch = new CountDownLatch(1);
+
+left.join(
+right,
+Integer::sum,
+JoinWindows.of(ofMillis(100)),
+StreamJoined.with(Serdes.String(), Serdes.Integer(), 
Serdes.Integer()).withStoreName("join-store"));
+
+System.out.println(builder.build().describe());

Review comment:
   debug line should be removed.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/AbstractStoreBuilder.java
##
@@ -62,7 +62,7 @@ public AbstractStoreBuilder(final String name,
 public StoreBuilder withLoggingEnabled(final Map 
config) {
 Objects.requireNonNull(config, "config can't be null");
 enableLogging = true;
-logConfig = config;
+logConfig.putAll(config);

Review comment:
   Nice catch




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on a change in pull request #11058: KAFKA-12802 Added a file based cache for consumed remote log metadata for each partition to avoid consuming again incase of broke

2021-10-06 Thread GitBox


satishd commented on a change in pull request #11058:
URL: https://github.com/apache/kafka/pull/11058#discussion_r722946986



##
File path: 
storage/src/main/java/org/apache/kafka/server/log/remote/metadata/storage/ConsumerTask.java
##
@@ -82,44 +87,148 @@
 // User topic partitions that this broker is a leader/follower for.
 private Set assignedTopicPartitions = 
Collections.emptySet();
 
-// Map of remote log metadata topic partition to consumed offsets.
+// Map of remote log metadata topic partition to consumed offsets. 
Received consumer records
+// may or may not have been processed based on the assigned topic 
partitions.
 private final Map partitionToConsumedOffsets = new 
ConcurrentHashMap<>();
 
+// Map of remote log metadata topic partition to processed offsets. 
Received consumer record is
+// processed as the remote log metadata record's topic partition exists in 
assigned topic partitions.
+private final Map partitionToProcessedOffsets = new 
ConcurrentHashMap<>();
+
+// Map of remote log metadata topic partition to processed offsets that 
were synced in committedOffsetsFile.
+private Map lastSyncedPartitionToConsumedOffsets = 
Collections.emptyMap();
+
+private final long committedOffsetSyncIntervalMs;
+private CommittedOffsetsFile committedOffsetsFile;
+private long lastSyncedTimeMs;
+
 public ConsumerTask(KafkaConsumer consumer,
 RemotePartitionMetadataEventHandler 
remotePartitionMetadataEventHandler,
-RemoteLogMetadataTopicPartitioner topicPartitioner) {
-Objects.requireNonNull(consumer);
-Objects.requireNonNull(remotePartitionMetadataEventHandler);
-Objects.requireNonNull(topicPartitioner);
-
-this.consumer = consumer;
-this.remotePartitionMetadataEventHandler = 
remotePartitionMetadataEventHandler;
-this.topicPartitioner = topicPartitioner;
+RemoteLogMetadataTopicPartitioner topicPartitioner,
+Path committedOffsetsPath,
+Time time,
+long committedOffsetSyncIntervalMs) {
+this.consumer = Objects.requireNonNull(consumer);
+this.remotePartitionMetadataEventHandler = 
Objects.requireNonNull(remotePartitionMetadataEventHandler);
+this.topicPartitioner = Objects.requireNonNull(topicPartitioner);
+this.time = Objects.requireNonNull(time);
+this.committedOffsetSyncIntervalMs = committedOffsetSyncIntervalMs;
+
+initializeConsumerAssignment(committedOffsetsPath);
+}
+
+private void initializeConsumerAssignment(Path committedOffsetsPath) {
+try {
+committedOffsetsFile = new 
CommittedOffsetsFile(committedOffsetsPath.toFile());
+} catch (IOException e) {
+throw new KafkaException(e);
+}
+
+Map committedOffsets = Collections.emptyMap();
+try {
+// Load committed offset and assign them in the consumer.
+committedOffsets = committedOffsetsFile.readEntries();
+} catch (IOException e) {
+// Ignore the error and consumer consumes from the earliest offset.
+log.error("Encountered error while building committed offsets from 
the file", e);
+}
+
+if (!committedOffsets.isEmpty()) {
+// Assign topic partitions from the earlier committed offsets file.
+Set earlierAssignedPartitions = committedOffsets.keySet();
+assignedMetaPartitions = 
Collections.unmodifiableSet(earlierAssignedPartitions);
+Set metadataTopicPartitions = 
earlierAssignedPartitions.stream()
+   
.map(x -> new TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, x))
+   
.collect(Collectors.toSet());
+consumer.assign(metadataTopicPartitions);
+
+// Seek to the committed offsets
+for (Map.Entry entry : committedOffsets.entrySet()) 
{
+partitionToConsumedOffsets.put(entry.getKey(), 
entry.getValue());
+partitionToProcessedOffsets.put(entry.getKey(), 
entry.getValue());
+consumer.seek(new 
TopicPartition(REMOTE_LOG_METADATA_TOPIC_NAME, entry.getKey()), 
entry.getValue());
+}
+
+lastSyncedPartitionToConsumedOffsets = 
Collections.unmodifiableMap(committedOffsets);
+}
 }
 
 @Override
 public void run() {
 log.info("Started Consumer task thread.");
+lastSyncedTimeMs = time.milliseconds();
 try {
 while (!closing) {
 maybeWaitForPartitionsAssignment();
 
 log.info("Polling consumer to receive remote log metadata 
topic records");
-ConsumerRecords consumerRecords
-= con

[GitHub] [kafka] showuon commented on a change in pull request #11381: KAFKA-12648: allow users to set a StreamsUncaughtExceptionHandler on individual named topologies

2021-10-06 Thread GitBox


showuon commented on a change in pull request #11381:
URL: https://github.com/apache/kafka/pull/11381#discussion_r722928724



##
File path: 
streams/src/main/java/org/apache/kafka/streams/errors/NamedTopologyException.java
##
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.streams.errors;
+
+import org.apache.kafka.common.KafkaException;
+
+public class NamedTopologyException extends KafkaException {

Review comment:
   We should add some java doc for this class, to explain what's the 
meaning for this exception, and some description, like other custom exception 
classes.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/processor/internals/namedtopology/KafkaStreamsNamedTopologyWrapper.java
##
@@ -138,6 +140,29 @@ public void addNamedTopology(final NamedTopology 
newTopology) {
 
topologyMetadata.registerAndBuildNewTopology(newTopology.internalTopologyBuilder());
 }
 
+/**
+ * Add a new NamedTopology to a running Kafka Streams app. If multiple 
instances of the application are running,
+ * you should inform all of them by calling {@link 
#addNamedTopology(NamedTopology)} on each client in order for
+ * it to begin processing the new topology.

Review comment:
   Should we add some description for `topologyExceptionHandler`? ex: `This 
method will also set the {@code topologyExceptionHandler} for the 
{@newTopology}`




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org