[GitHub] [kafka] ableegoldman opened a new pull request #11601: KAFKA-12648: Minor fixes for input topic management

2021-12-13 Thread GitBox


ableegoldman opened a new pull request #11601:
URL: https://github.com/apache/kafka/pull/11601


   While working on [#11600](https://github.com/apache/kafka/pull/11600) I 
noticed a few issues with how we manage topics in the TopologyMetadata, 
particularly surrounding the code to update and track input topics. This PR 
cleans that up and adds some further verification when processing possible 
updates from the subscription metadata or assignment


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13077) Replication failing after unclean shutdown of ZK and all brokers

2021-12-13 Thread Kashish Bansal (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13077?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458927#comment-17458927
 ] 

Kashish Bansal commented on KAFKA-13077:


[~junrao] We faced the same issue while restarting kafka also where topic 
partitions were not replicating across brokers and there was a constant URP for 
internal and external topics.

One more thing, sometimes during zk restart, the data in zk goes away and we 
are forced to restart kafka and after that the same condition occurs where 
partitions won't replicate across brokers.

Please help for the same.

> Replication failing after unclean shutdown of ZK and all brokers
> 
>
> Key: KAFKA-13077
> URL: https://issues.apache.org/jira/browse/KAFKA-13077
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.0
>Reporter: Christopher Auston
>Priority: Minor
>
> I am submitting this in the spirit of what can go wrong when an operator 
> violates the constraints Kafka depends on. I don't know if Kafka could or 
> should handle this more gracefully. I decided to file this issue because it 
> was easy to get the problem I'm reporting with Kubernetes StatefulSets (STS). 
> By "easy" I mean that I did not go out of my way to corrupt anything, I just 
> was not careful when restarting ZK and brokers.
> I violated the constraints of keeping Zookeeper stable and at least one 
> running in-sync replica. 
> I am running the bitnami/kafka helm chart on Amazon EKS.
> {quote}% kubectl get po kaf-kafka-0 -ojson |jq .spec.containers'[].image'
> "docker.io/bitnami/kafka:2.8.0-debian-10-r43"
> {quote}
> I started with 3 ZK instances and 3 brokers (both STS). I changed the 
> cpu/memory requests on both STS and kubernetes proceeded to restart ZK and 
> kafka instances at the same time. If I recall correctly there were some 
> crashes and several restarts but eventually all the instances were running 
> again. It's possible all ZK nodes and all brokers were unavailable at various 
> points.
> The problem I noticed was that two of the brokers were just continually 
> spitting out messages like:
> {quote}% kubectl logs kaf-kafka-0 --tail 10
> [2021-07-13 14:26:08,871] INFO [ProducerStateManager 
> partition=__transaction_state-0] Loading producer state from snapshot file 
> 'SnapshotFile(/bitnami/kafka/data/__transaction_state-0/0001.snapshot,1)'
>  (kafka.log.ProducerStateManager)
> [2021-07-13 14:26:08,871] WARN [Log partition=__transaction_state-0, 
> dir=/bitnami/kafka/data] *Non-monotonic update of high watermark from 
> (offset=2744 segment=[0:1048644]) to (offset=1 segment=[0:169])* 
> (kafka.log.Log)
> [2021-07-13 14:26:08,874] INFO [Log partition=__transaction_state-10, 
> dir=/bitnami/kafka/data] Truncating to offset 2 (kafka.log.Log)
> [2021-07-13 14:26:08,877] INFO [Log partition=__transaction_state-10, 
> dir=/bitnami/kafka/data] Loading producer state till offset 2 with message 
> format version 2 (kafka.log.Log)
> [2021-07-13 14:26:08,877] INFO [ProducerStateManager 
> partition=__transaction_state-10] Loading producer state from snapshot file 
> 'SnapshotFile(/bitnami/kafka/data/__transaction_state-10/0002.snapshot,2)'
>  (kafka.log.ProducerStateManager)
> [2021-07-13 14:26:08,877] WARN [Log partition=__transaction_state-10, 
> dir=/bitnami/kafka/data] Non-monotonic update of high watermark from 
> (offset=2930 segment=[0:1048717]) to (offset=2 segment=[0:338]) 
> (kafka.log.Log)
> [2021-07-13 14:26:08,880] INFO [Log partition=__transaction_state-20, 
> dir=/bitnami/kafka/data] Truncating to offset 1 (kafka.log.Log)
> [2021-07-13 14:26:08,882] INFO [Log partition=__transaction_state-20, 
> dir=/bitnami/kafka/data] Loading producer state till offset 1 with message 
> format version 2 (kafka.log.Log)
> [2021-07-13 14:26:08,882] INFO [ProducerStateManager 
> partition=__transaction_state-20] Loading producer state from snapshot file 
> 'SnapshotFile(/bitnami/kafka/data/__transaction_state-20/0001.snapshot,1)'
>  (kafka.log.ProducerStateManager)
> [2021-07-13 14:26:08,883] WARN [Log partition=__transaction_state-20, 
> dir=/bitnami/kafka/data] Non-monotonic update of high watermark from 
> (offset=2956 segment=[0:1048608]) to (offset=1 segment=[0:169]) 
> (kafka.log.Log)
> {quote}
> If I describe that topic I can see that several partitions have a leader of 2 
> and the ISR is just 2 (NOTE I added two more brokers and tried to reassign 
> the topic onto brokers 2,3,4 which you can see below). The new brokers also 
> spit out the messages about "non-monotonic update" just like the original 
> followers. This describe output is from the following day.
> {{% kafka-topics.sh ${=BS} -topic __transaction_state -describe}}
> {{Topic: __transaction_state TopicId: i7bBNCeuQMWl-ZMpzrnMAw 

[GitHub] [kafka] ableegoldman opened a new pull request #11600: KAFKA-12648: handle MissingSourceTopicException for named topologies

2021-12-13 Thread GitBox


ableegoldman opened a new pull request #11600:
URL: https://github.com/apache/kafka/pull/11600


   Another source of flakiness we found in the NamedTopologyIntegrationTest was 
an ocasional MissingSourceTopicException that was causing the application to 
shut down. We created all source topics ahead of time in the tests, leading us 
to discover this [race 
condition](https://issues.apache.org/jira/browse/KAFKA-13543) in the consumer 
client which can lead to spurious MissingSourceTopicExceptions when the 
metadata hasn't finished updating after a change in the consumer's subscription.
   
   In addition to finding a workaround for this bug, throwing this 
MissingSourceTopicException and shutting down the entire app is itself a bug in 
the NamedTopology feature -- we should not stop all clients and prevent any 
further processing of the completely valid topologies just because one (or 
more) topologies were added that are missing their source topics. We can just 
remove those topologies from the assignment for the time being, and wait until 
the metadata has finished updating or the user has created the input topics to 
start assigning tasks from them.
   
   So, this PR does two things:
   a) Avoid throwing a MissingSourceTopicException inside the #assign method 
when named topologies are used, and just remove those topologies which are 
missing any of their input topics from the assignment. 
   b) Trigger the uncaught exception handler with a MissingSourceTopicException 
for each of the topologies that are missing topics, but don't shut down the 
thread -- we just want to make sure this issue is made visible to the user.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13543) Consumer may pass stale cluster metadata to the assignor following a subscription update

2021-12-13 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13543?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458923#comment-17458923
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13543:


One possible fix would be to block inside Consumer#subscribe until the metadata 
has been updated, as we currently just issue a non-blocking call to update the 
metadata and then return. Of course there could be some unintended 
repercussions to making this previously async API suddenly a blocking call, so 
we should definitely discuss that further.

If waiting on the metadata update is off the table another option is to just 
mitigate the issue by at least giving the assignor a way to tell that the 
metadata has not been updated since the subscription was, and therefore that 
the cluster metadata passed in to #assign can not necessarily be trusted. 

> Consumer may pass stale cluster metadata to the assignor following a 
> subscription update
> 
>
> Key: KAFKA-13543
> URL: https://issues.apache.org/jira/browse/KAFKA-13543
> Project: Kafka
>  Issue Type: Bug
>  Components: consumer
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> A consumer only ever tracks metadata corresponding to its subscribed topics, 
> which can cause a race condition during a rebalance immediately after a 
> change to the consumer's subscription. Particularly, when new topics are 
> added to the subscription but a rebalance in kicked off before the consumer's 
> metadata is updated with the new topics, it will pass a stale copy of the 
> cluster metadata in to the ConsumerPartitionAssignor#assign method, which may 
> not include the newly subscribed topics regardless of whether they do or do 
> not exist.
> Most apps are likely unaffected by this, including any consumer client apps 
> using OOTB assignors, since a new rebalance will be kicked off when the 
> metadata is updated and any partitions from the new topics will be assigned 
> at that time. But in Kafka Streams, we do a check during each rebalance to 
> ensure that any user input topics are created ahead of time. This race 
> condition can result in Streams incorrectly identifying user topics as 
> missing and throwing a MissingSourceTopicException when a new topology 
> subscribed to new topics is added to the application 
> We can work around this for now, but it's unfortunate that we can't 
> distinguish between true missing source topics and a transient lack of these 
> topics in the metadata. There might also be some plain consumer client apps 
> with custom assignors that run into this as well, for more advanced users.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13543) Consumer may pass stale cluster metadata to the assignor following a subscription update

2021-12-13 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13543:
--

 Summary: Consumer may pass stale cluster metadata to the assignor 
following a subscription update
 Key: KAFKA-13543
 URL: https://issues.apache.org/jira/browse/KAFKA-13543
 Project: Kafka
  Issue Type: Bug
  Components: consumer
Reporter: A. Sophie Blee-Goldman


A consumer only ever tracks metadata corresponding to its subscribed topics, 
which can cause a race condition during a rebalance immediately after a change 
to the consumer's subscription. Particularly, when new topics are added to the 
subscription but a rebalance in kicked off before the consumer's metadata is 
updated with the new topics, it will pass a stale copy of the cluster metadata 
in to the ConsumerPartitionAssignor#assign method, which may not include the 
newly subscribed topics regardless of whether they do or do not exist.

Most apps are likely unaffected by this, including any consumer client apps 
using OOTB assignors, since a new rebalance will be kicked off when the 
metadata is updated and any partitions from the new topics will be assigned at 
that time. But in Kafka Streams, we do a check during each rebalance to ensure 
that any user input topics are created ahead of time. This race condition can 
result in Streams incorrectly identifying user topics as missing and throwing a 
MissingSourceTopicException when a new topology subscribed to new topics is 
added to the application 

We can work around this for now, but it's unfortunate that we can't distinguish 
between true missing source topics and a transient lack of these topics in the 
metadata. There might also be some plain consumer client apps with custom 
assignors that run into this as well, for more advanced users.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13281) Support live upgrades with dynamic addition/removal of modular topologies

2021-12-13 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458847#comment-17458847
 ] 

Luke Chen commented on KAFKA-13281:
---

Thanks! [~ableegoldman] ! :)

> Support live upgrades with dynamic addition/removal of modular topologies
> -
>
> Key: KAFKA-13281
> URL: https://issues.apache.org/jira/browse/KAFKA-13281
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13542) Utilize the new Consumer#enforceRebalance(reason) API in Streams

2021-12-13 Thread A. Sophie Blee-Goldman (Jira)
A. Sophie Blee-Goldman created KAFKA-13542:
--

 Summary: Utilize the new Consumer#enforceRebalance(reason) API in 
Streams
 Key: KAFKA-13542
 URL: https://issues.apache.org/jira/browse/KAFKA-13542
 Project: Kafka
  Issue Type: Task
  Components: streams
Reporter: A. Sophie Blee-Goldman
 Fix For: 3.2.0


KIP-800 is adding a new "reason" parameter to the Consumer#enforceRebalance 
API, which will be passed in to a new field of the JoinGroup protocol. We 
invoke this API throughout Streams for various reasons, which are very useful 
for debugging the cause of rebalancing. Passing in the reason to this new API 
would make it possible to figure out why a Streams client triggered a rebalance 
from the broker logs, which are often the only logs available when the client 
logs cannot be retrieved for whatever reason



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13281) Support live upgrades with dynamic addition/removal of modular topologies

2021-12-13 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13281?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13281:
---
Summary: Support live upgrades with dynamic addition/removal of modular 
topologies  (was: Support upgrades with dynamic addition/removal of disjoint 
"named" topologies)

> Support live upgrades with dynamic addition/removal of modular topologies
> -
>
> Key: KAFKA-13281
> URL: https://issues.apache.org/jira/browse/KAFKA-13281
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13282) Draft final NamedTopology API and publish a KIP

2021-12-13 Thread A. Sophie Blee-Goldman (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13282?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

A. Sophie Blee-Goldman updated KAFKA-13282:
---
Description: 
The pre-KIP experimental phase has left quite a few open questions around the 
API of this new feature, we need to hash that that out and then write it up 
into a KIP before introducing this in the public interface

[KIP-809 |https://cwiki.apache.org/confluence/x/7ovkCw]

  was:The pre-KIP experimental phase has left quite a few open questions around 
the API of this new feature, we need to hash that that out and then write it up 
into a KIP before introducing this in the public interface


> Draft final NamedTopology API and publish a KIP
> ---
>
> Key: KAFKA-13282
> URL: https://issues.apache.org/jira/browse/KAFKA-13282
> Project: Kafka
>  Issue Type: Sub-task
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>
> The pre-KIP experimental phase has left quite a few open questions around the 
> API of this new feature, we need to hash that that out and then write it up 
> into a KIP before introducing this in the public interface
> [KIP-809 |https://cwiki.apache.org/confluence/x/7ovkCw]



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13281) Support upgrades with dynamic addition/removal of disjoint "named" topologies

2021-12-13 Thread A. Sophie Blee-Goldman (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13281?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458831#comment-17458831
 ] 

A. Sophie Blee-Goldman commented on KAFKA-13281:


Sorry, I'm super behind on my email inbox – thanks for responding Walker. Yeah 
we don't have a target date for the overall NamedTopology feature itself just 
yet (I only just reserved a KIP for it today) but the TopologyConfig definitely 
has a use outside of this work so I'm happy to start pulling it into the public 
API

> Support upgrades with dynamic addition/removal of disjoint "named" topologies
> -
>
> Key: KAFKA-13281
> URL: https://issues.apache.org/jira/browse/KAFKA-13281
> Project: Kafka
>  Issue Type: New Feature
>  Components: streams
>Reporter: A. Sophie Blee-Goldman
>Priority: Major
>  Labels: needs-kip
>




--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] vvcephei commented on pull request #11598: feat: Implement range and scan queries

2021-12-13 Thread GitBox


vvcephei commented on pull request #11598:
URL: https://github.com/apache/kafka/pull/11598#issuecomment-992981391


   Oh, one other thing I just noticed, @vpapavas , can you add JavaDoc 
explaining what the RangeQuery is for and what each one of its methods does? 
That's a public API, so it should be well documented.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11598: feat: Implement range and scan queries

2021-12-13 Thread GitBox


vvcephei commented on a change in pull request #11598:
URL: https://github.com/apache/kafka/pull/11598#discussion_r768122088



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/InMemoryKeyValueStore.java
##
@@ -175,6 +194,7 @@ public synchronized void putAll(final List> entries) {
 if (from == null && to == null) {
 return getKeyValueIterator(map.keySet(), forward);
 } else if (from == null) {
+System.out.println("---> range upper bound");

Review comment:
   looks like this was left over.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -186,6 +203,85 @@ public boolean setFlushListener(final 
CacheFlushListener listener,
 return false;
 }
 
+@SuppressWarnings("unchecked")
+@Override
+public  QueryResult query(final Query query,
+final PositionBound positionBound,
+final boolean collectExecutionInfo) {
+
+final long start = System.nanoTime();
+final QueryResult result;
+
+final QueryHandler handler = queryHandlers.get(query.getClass());
+if (handler == null) {
+result = wrapped().query(query, positionBound, 
collectExecutionInfo);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " in " + 
(System.nanoTime() - start) + "ns");
+}
+} else {
+result = (QueryResult) handler.apply(
+query,
+positionBound,
+collectExecutionInfo,
+this
+);
+if (collectExecutionInfo) {
+result.addExecutionInfo(
+"Handled in " + getClass() + " with serdes "
++ serdes + " in " + (System.nanoTime() - 
start) + "ns");
+}
+}
+return result;
+}
+
+@SuppressWarnings("unchecked")
+private  QueryResult runRangeQuery(
+final Query query, final PositionBound positionBound, final 
boolean collectExecutionInfo) {
+
+final QueryResult result;
+final RangeQuery typedQuery = (RangeQuery) query;
+final RangeQuery rawRangeQuery;
+if (typedQuery.getLowerBound().isPresent() && 
typedQuery.getUpperBound().isPresent()) {
+rawRangeQuery = 
RangeQuery.withRange(keyBytes(typedQuery.getLowerBound().get()),
+keyBytes(typedQuery.getUpperBound().get()));
+} else if (typedQuery.getLowerBound().isPresent()) {
+rawRangeQuery = 
RangeQuery.withLowerBound(keyBytes(typedQuery.getLowerBound().get()));
+} else if (typedQuery.getUpperBound().isPresent()) {
+rawRangeQuery = 
RangeQuery.withUpperBound(keyBytes(typedQuery.getUpperBound().get()));
+} else {
+rawRangeQuery = RangeQuery.withNoBounds();
+}

Review comment:
   At the risk of being too fancy, what do you think about this instead?
   ```suggestion
   rawRangeQuery = 
RangeQuery.withRange(typedQuery.getLowerBound.map(this::keyBytes),
   typedQuery.getUpperBound.map(this::keyBytes));
   ```

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -279,34 +344,34 @@ public static void before()
 final RecordMetadata recordMetadata = future.get(1, 
TimeUnit.MINUTES);
 assertThat(recordMetadata.hasOffset(), is(true));
 INPUT_POSITION.withComponent(
-recordMetadata.topic(),
-recordMetadata.partition(),
-recordMetadata.offset()
+recordMetadata.topic(),
+recordMetadata.partition(),
+recordMetadata.offset()
 );
 }
 }
 
 assertThat(INPUT_POSITION, equalTo(
-Position
-.emptyPosition()
-.withComponent(INPUT_TOPIC_NAME, 0, 1L)
-.withComponent(INPUT_TOPIC_NAME, 1, 0L)
+Position
+.emptyPosition()
+.withComponent(INPUT_TOPIC_NAME, 0, 1L)
+.withComponent(INPUT_TOPIC_NAME, 1, 1L)
 ));
 }
 
 @Before
 public void beforeTest() {
 final StoreSupplier supplier = storeToTest.supplier();
 final Properties streamsConfig = streamsConfiguration(
-cache,
-log,
-storeToTest.name()
+cache,
+log,
+storeToTest.name()

Review comment:
   I'm sorry to sound picky, but do you mind backing out these formatting 
changes? I'm only concerned because there's a lot of them. Otherwise, we'll 

[jira] [Commented] (KAFKA-13476) Streams crashes when non Base64 Offset Metadata is found

2021-12-13 Thread Guozhang Wang (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458730#comment-17458730
 ] 

Guozhang Wang commented on KAFKA-13476:
---

Thanks [~RBosch81], that clears my questions then. I saw [~mjsax] is already on 
your PR so I'll leave it to him to review and merge it.

> Streams crashes when non Base64 Offset Metadata is found
> 
>
> Key: KAFKA-13476
> URL: https://issues.apache.org/jira/browse/KAFKA-13476
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Richard Bosch
>Assignee: Richard Bosch
>Priority: Minor
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> Kafka Streams applications use the metadata stored with the committed offsets 
> from previous running instances to extract timestamps.
> But when the metadata field contains other data the Base64 decoder will throw 
> an exception causing the Streams application to fail.
> A new Offset commit is then required to stop this failure.
> I've included the part of the log when we started a Kafka Streams app after 
> setting the offsets using a third party tool. This tool adds some tracing 
> metadata so developers and operators could debug who performed this custom 
> offset commit.
>  
> {noformat}
> 2021-11-16 12:56:36.020  INFO 25 --- [-StreamThread-2] 
> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=example-app-3, 
> groupId=axual-demo-example-example-app] Unsubscribed all topics or patterns 
> and assigned partitions
>   at java.base/java.util.Base64$Decoder.decode(Unknown Source) ~[na:na]
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.decodeTimestamp(StreamTask.java:1039)
>  ~[kafka-streams-2.7.0.jar:na]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  ~[kafka-streams-2.7.0.jar:na]
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:837)
>  ~[kafka-streams-2.7.0.jar:na]
> java.lang.IllegalArgumentException: Illegal base64 character 7b
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:728)
>  ~[kafka-streams-2.7.0.jar:na]
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeMetadata(StreamTask.java:818)
>  ~[kafka-streams-2.7.0.jar:na]
> 2021-11-16 12:56:36.127 ERROR 25 --- [-StreamThread-1] 
> org.apache.kafka.streams.KafkaStreams: stream-client 
> [streams-example-app-1] All stream threads have died. The instance will be in 
> error state and should be closed.
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  ~[kafka-streams-2.7.0.jar:na]
> java.lang.IllegalArgumentException: Illegal base64 character 7b
> {noformat}
> I recommend adding a Try Catch block around the Base64 decode in the 
> StreamTask.decodeTimestamp method and return the Unknown value when this 
> occurs.
> This is pure for resilience when bad data is encountered.
> After the Streams application performs a new offset commit the error should 
> not occur again, limiting the change of frequently occurring warnings in the 
> logs
> I've already made the changes and added a test for this issue, as I would 
> like to contribute to Kafka.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] vvcephei commented on pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-13 Thread GitBox


vvcephei commented on pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#issuecomment-992922162


   Note to reviewers, upon reflection, and based on the discussions on the 
other ongoing KIPs to add IQv2 queries, I've decided to drop the "RawKeyQuery" 
that I had originally proposed. It really just saved us from one extra cast in 
an execution path that already has a ton of other casts. It was an attempt to 
be a little elegant, but I don't think it was successful.
   
   I'm hoping that once we have several queries in place, we'll be able to golf 
it a bit and come up with an actually more elegant approach to the internal 
code.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Resolved] (KAFKA-13540) UniformStickyPartitioner leads to uneven Kafka partitions

2021-12-13 Thread Artem Livshits (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Artem Livshits resolved KAFKA-13540.

Resolution: Duplicate

See also 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-794%3A+Strictly+Uniform+Sticky+Partitioner

> UniformStickyPartitioner leads to uneven Kafka partitions
> -
>
> Key: KAFKA-13540
> URL: https://issues.apache.org/jira/browse/KAFKA-13540
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.4.1
>Reporter: nk2242696
>Priority: Major
> Attachments: MicrosoftTeams-image (1).png
>
>
> Kafka Topic with 20 partitions, 24 hour TTL. Replication factor of 3 . 
> Using UniformStickyPartitioner expected size of each partition to be roughly 
> of same size But realised size for some of the partitions is almost double .
> !MicrosoftTeams-image (1).png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison opened a new pull request #11599: KAFKA-13527: Add top-level error code field to DescribeLogDirsResponse

2021-12-13 Thread GitBox


mimaison opened a new pull request #11599:
URL: https://github.com/apache/kafka/pull/11599


   Implements KIP-784
   
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Updated] (KAFKA-13511) Update TimestampConverter SMT to support unix epoch as millis, micros, and seconds

2021-12-13 Thread Julien Chanaud (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Chanaud updated KAFKA-13511:
---
Labels: connect-transformation needs-kip  (was: )

> Update TimestampConverter SMT to support unix epoch as millis, micros, and 
> seconds
> --
>
> Key: KAFKA-13511
> URL: https://issues.apache.org/jira/browse/KAFKA-13511
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Julien Chanaud
>Assignee: Julien Chanaud
>Priority: Minor
>  Labels: connect-transformation, needs-kip
>
> Currently, the SMT TimestampConverter can convert Timestamp from either 
> source String, Long or Date into target String, Long or Date.
> The problem is that Long source or target is required to be epoch in 
> milliseconds.
> In many cases, epoch is represented with different precisions. This leads to 
> several Jira tickets :
>  * KAFKA-12364
>  * KAFKA-10561
> I propose to add a new config to TimestampConverter called "epoch.precision" 
> which defaults to "millis" so as to not impact existing code, and allows for 
> more precisions : seconds, millis, micros.
> {code:json}
> "transforms": "TimestampConverter",
> "transforms.TimestampConverter.type": 
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "transforms.TimestampConverter.field": "event_date",
> "transforms.TimestampConverter.epoch.precision": "micros",
> "transforms.TimestampConverter.target.type": "Timestamp"
> {code}
> Exactly like "format" field which is used as input when the source in String 
> and output when the target.type is string, this new field would be used as 
> input when the field is Long, and as output when the target.type is "unix"



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Assigned] (KAFKA-13511) Update TimestampConverter SMT to support unix epoch as millis, micros, and seconds

2021-12-13 Thread Julien Chanaud (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13511?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Julien Chanaud reassigned KAFKA-13511:
--

Assignee: Julien Chanaud

> Update TimestampConverter SMT to support unix epoch as millis, micros, and 
> seconds
> --
>
> Key: KAFKA-13511
> URL: https://issues.apache.org/jira/browse/KAFKA-13511
> Project: Kafka
>  Issue Type: Improvement
>  Components: KafkaConnect
>Reporter: Julien Chanaud
>Assignee: Julien Chanaud
>Priority: Minor
>
> Currently, the SMT TimestampConverter can convert Timestamp from either 
> source String, Long or Date into target String, Long or Date.
> The problem is that Long source or target is required to be epoch in 
> milliseconds.
> In many cases, epoch is represented with different precisions. This leads to 
> several Jira tickets :
>  * KAFKA-12364
>  * KAFKA-10561
> I propose to add a new config to TimestampConverter called "epoch.precision" 
> which defaults to "millis" so as to not impact existing code, and allows for 
> more precisions : seconds, millis, micros.
> {code:json}
> "transforms": "TimestampConverter",
> "transforms.TimestampConverter.type": 
> "org.apache.kafka.connect.transforms.TimestampConverter$Value",
> "transforms.TimestampConverter.field": "event_date",
> "transforms.TimestampConverter.epoch.precision": "micros",
> "transforms.TimestampConverter.target.type": "Timestamp"
> {code}
> Exactly like "format" field which is used as input when the source in String 
> and output when the target.type is string, this new field would be used as 
> input when the field is Long, and as output when the target.type is "unix"



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] vpapavas opened a new pull request #11598: feat: Implement range and scan queries

2021-12-13 Thread GitBox


vpapavas opened a new pull request #11598:
URL: https://github.com/apache/kafka/pull/11598


   Implement the RangeQuery as proposed in KIP-805
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] jsancio commented on pull request #11593: KAFKA-13528: KRaft RegisterBroker should validate that the cluster ID matches

2021-12-13 Thread GitBox


jsancio commented on pull request #11593:
URL: https://github.com/apache/kafka/pull/11593#issuecomment-992715953


   Thanks for the reply @cmccabe .
   
   Should we add an integration that shows the expected behavior on the broker 
side?
   
   Can we add a description to the PR so that it is included in the commit 
message?


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] AndyGee commented on a change in pull request #11426: KAFKA-13391: don't fsync directory on Windows OS

2021-12-13 Thread GitBox


AndyGee commented on a change in pull request #11426:
URL: https://github.com/apache/kafka/pull/11426#discussion_r767968720



##
File path: clients/src/main/java/org/apache/kafka/common/utils/Utils.java
##
@@ -948,10 +948,12 @@ public static void atomicMoveWithFallback(Path source, 
Path target, boolean need
 /**
  * Flushes dirty directories to guarantee crash consistency.
  *
+ * Note: We don't fsync directories on Windows OS because otherwise it'll 
throw AccessDeniedException (KAFKA-13391)
+ *
  * @throws IOException if flushing the directory fails.
  */
 public static void flushDir(Path path) throws IOException {
-if (path != null) {
+if (path != null && !OperatingSystem.IS_WINDOWS) {
 try (FileChannel dir = FileChannel.open(path, 
StandardOpenOption.READ)) {
 dir.force(true);

Review comment:
   There is a file lock on this file that causes the issue, which might be 
hiding another issue even on other platforms.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] satishd commented on pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.

2021-12-13 Thread GitBox


satishd commented on pull request #11390:
URL: https://github.com/apache/kafka/pull/11390#issuecomment-992617350


   Thanks @junrao for the review. Please find inline replies, addressed most of 
them with latest commits.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] vvcephei commented on a change in pull request #11582: KAFKA-13525: Implement KeyQuery in Streams IQv2

2021-12-13 Thread GitBox


vvcephei commented on a change in pull request #11582:
URL: https://github.com/apache/kafka/pull/11582#discussion_r767856835



##
File path: 
streams/src/main/java/org/apache/kafka/streams/state/internals/MeteredKeyValueStore.java
##
@@ -79,6 +88,14 @@
 private StreamsMetricsImpl streamsMetrics;
 private TaskId taskId;
 
+private  Map queryHandlers =
+mkMap(
+mkEntry(
+KeyQuery.class,
+(query, positionBound, collectExecutionInfo, store) -> 
runKeyQuery(query, positionBound, collectExecutionInfo)
+)
+);
+

Review comment:
   Just trying to establish some pattern here that can let us dispatch 
these queries efficiently. This O(1) lookup should be faster than an O(n) 
if/else check or an O(log n) string switch statement, but we won't know for 
sure without benchmarking.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -216,9 +269,17 @@ public boolean global() {
 
 public abstract StoreSupplier supplier();
 
+public boolean timestamped() {
+return true; // most stores are timestamped
+};
+
 public boolean global() {
 return false;
 }
+
+public boolean keyValue() {
+return false;
+}

Review comment:
   These help us adjust our expectations in the validations below, so that 
we can cover all store types in the same test.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -513,6 +590,43 @@ public void shouldHandlePingQuery() {
 assertThat(result.getPosition(), is(INPUT_POSITION));
 }
 
+public  void shouldHandleKeyQuery(
+final Integer key,
+final Function valueExtactor,
+final Integer expectedValue) {
+
+final KeyQuery query = KeyQuery.withKey(key);
+final StateQueryRequest request =
+inStore(STORE_NAME)
+.withQuery(query)
+.withPartitions(mkSet(0, 1))
+.withPositionBound(PositionBound.at(INPUT_POSITION));
+
+final StateQueryResult result =
+IntegrationTestUtils.iqv2WaitForResult(kafkaStreams, request);
+
+final QueryResult queryResult =
+result.getGlobalResult() != null
+? result.getGlobalResult()
+: result.getOnlyPartitionResult();
+final boolean failure = queryResult.isFailure();
+if (failure) {
+throw new AssertionError(queryResult.toString());
+}
+assertThat(queryResult.isSuccess(), is(true));
+
+assertThrows(IllegalArgumentException.class, 
queryResult::getFailureReason);
+assertThrows(IllegalArgumentException.class,
+queryResult::getFailureMessage);
+
+final V result1 = queryResult.getResult();
+final Integer integer = valueExtactor.apply(result1);
+assertThat(integer, is(expectedValue));

Review comment:
   Here's where we run that function to either get the value out of the 
ValueAndTimestamp or just give back the value with the identity function.

##
File path: 
streams/src/main/java/org/apache/kafka/streams/query/FailureReason.java
##
@@ -52,5 +52,12 @@
  * The requested store partition does not exist at all. For example, 
partition 4 was requested,
  * but the store in question only has 4 partitions (0 through 3).
  */
-DOES_NOT_EXIST;
+DOES_NOT_EXIST,
+
+/**
+ * The store that handled the query got an exception during query 
execution. The message
+ * will contain the exception details. Depending on the nature of the 
exception, the caller
+ * may be able to retry this instance or may need to try a different 
instance.
+ */
+STORE_EXCEPTION;

Review comment:
   I realized in the implementation for RocksDB that we will need to 
account for runtime exceptions from the stores. I'll update the KIP.

##
File path: 
streams/src/test/java/org/apache/kafka/streams/integration/IQv2StoreIntegrationTest.java
##
@@ -426,6 +487,22 @@ public void verifyStore() {
 shouldHandlePingQuery();
 shouldCollectExecutionInfo();
 shouldCollectExecutionInfoUnderFailure();
+
+if (storeToTest.keyValue()) {
+if (storeToTest.timestamped()) {
+shouldHandleKeyQuery(
+2,
+(Function, Integer>) 
ValueAndTimestamp::value,
+2
+);
+} else {
+shouldHandleKeyQuery(
+2,
+Function.identity(),
+2
+);
+}
+}

Review comment:
   Here's where we use those properties. KeyQueries are only implemented 
for 

[GitHub] [kafka] prat0318 commented on pull request #11552: KAFKA-13488: Producer fails to recover if topic gets deleted midway

2021-12-13 Thread GitBox


prat0318 commented on pull request #11552:
URL: https://github.com/apache/kafka/pull/11552#issuecomment-992560873


   @hachikuji @jolshan Bump on the review.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] svudutala-vmware edited a comment on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2021-12-13 Thread GitBox


svudutala-vmware edited a comment on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-992556542


   > > > Will this PR solve 
[CVE-2021-44228](https://github.com/advisories/GHSA-jfh8-c2jp-5v3q)?
   > > 
   > > 
   > > @soumiksamanta
   > > 
https://github.com/apache/kafka/blob/bd3038383265f7bb850c09fe0a74a48c5c2e6f99/gradle/dependencies.gradle#L78
   > > 
   > > should be upgraded to 2.15.0. log4j <= 2.14.0 all have this issue.
   > > Initially I thought log4j 1.x is not impacted but as per 
[apache/logging-log4j2#608 
(comment)](https://github.com/apache/logging-log4j2/pull/608#issuecomment-990494126)
 it is.
   > 
   > Thank you for sharing the comment. Isn't that comment for log4j v1 in 
general. kafka by default does not use JMS appender. Do you think it is 
impacted under the default configuration.
   > 
   > Also refer to this post: 
https://lists.apache.org/thread/lgbtvvmy68p0059yoyn9qxzosdmx4jdv
   
   Yeah @unverified-user . My understanding is same too. This should not impact 
unless there is use of JMS.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] svudutala-vmware commented on pull request #7898: KAFKA-9366: Change log4j dependency into log4j2

2021-12-13 Thread GitBox


svudutala-vmware commented on pull request #7898:
URL: https://github.com/apache/kafka/pull/7898#issuecomment-992556542


   > > > Will this PR solve 
[CVE-2021-44228](https://github.com/advisories/GHSA-jfh8-c2jp-5v3q)?
   > > 
   > > 
   > > @soumiksamanta
   > > 
https://github.com/apache/kafka/blob/bd3038383265f7bb850c09fe0a74a48c5c2e6f99/gradle/dependencies.gradle#L78
   > > 
   > > should be upgraded to 2.15.0. log4j <= 2.14.0 all have this issue.
   > > Initially I thought log4j 1.x is not impacted but as per 
[apache/logging-log4j2#608 
(comment)](https://github.com/apache/logging-log4j2/pull/608#issuecomment-990494126)
 it is.
   > 
   > Thank you for sharing the comment. Isn't that comment for log4j v1 in 
general. kafka by default does not use JMS appender. Do you think it is 
impacted under the default configuration.
   > 
   > Also refer to this post: 
https://lists.apache.org/thread/lgbtvvmy68p0059yoyn9qxzosdmx4jdv
   
   Yeah @unverified-user . My understanding is same too. This should not impact 
unless there is use of JMS.
   
   I am not expert around Kafka connect using any connectors to use JMS. There 
may be potential impact I guess.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13535) Workaround for mitigating CVE-2021-44228 Kafka

2021-12-13 Thread Akansh Shandilya (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458384#comment-17458384
 ] 

Akansh Shandilya commented on KAFKA-13535:
--

[~showuon] Thanks , will wait as per your suggestion. 

Regarding JMS Appender , found a log4j manual, but that is applicable for log4j 
2.x.x. 

[https://logging.apache.org/log4j/2.x/manual/appenders.html]

 

> Workaround for mitigating CVE-2021-44228 Kafka 
> ---
>
> Key: KAFKA-13535
> URL: https://issues.apache.org/jira/browse/KAFKA-13535
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.1
>Reporter: Akansh Shandilya
>Priority: Major
>
> Kafka v2.8.1 uses log4j v1.x . Please review following information :
>  
> Is Kafka v2.8.1 impacted by  CVE-2021-44228?
> If yes, is there any workaround/recommendation available for Kafka  v2.8.1 to 
> mitigate CVE-2021-44228



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13540) UniformStickyPartitioner leads to uneven Kafka partitions

2021-12-13 Thread nk2242696 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

nk2242696 updated KAFKA-13540:
--
Reviewer:   (was: nk2242696)

> UniformStickyPartitioner leads to uneven Kafka partitions
> -
>
> Key: KAFKA-13540
> URL: https://issues.apache.org/jira/browse/KAFKA-13540
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.4.1
>Reporter: nk2242696
>Priority: Major
> Attachments: MicrosoftTeams-image (1).png
>
>
> Kafka Topic with 20 partitions, 24 hour TTL. Replication factor of 3 . 
> Using UniformStickyPartitioner expected size of each partition to be roughly 
> of same size But realised size for some of the partitions is almost double .
> !MicrosoftTeams-image (1).png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Updated] (KAFKA-13540) UniformStickyPartitioner leads to uneven Kafka partitions

2021-12-13 Thread nk2242696 (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13540?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

nk2242696 updated KAFKA-13540:
--
Reviewer: nk2242696

> UniformStickyPartitioner leads to uneven Kafka partitions
> -
>
> Key: KAFKA-13540
> URL: https://issues.apache.org/jira/browse/KAFKA-13540
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.4.1
>Reporter: nk2242696
>Priority: Major
> Attachments: MicrosoftTeams-image (1).png
>
>
> Kafka Topic with 20 partitions, 24 hour TTL. Replication factor of 3 . 
> Using UniformStickyPartitioner expected size of each partition to be roughly 
> of same size But realised size for some of the partitions is almost double .
> !MicrosoftTeams-image (1).png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13541) Make IQv2 query/store interface type safe

2021-12-13 Thread Patrick Stuedi (Jira)
Patrick Stuedi created KAFKA-13541:
--

 Summary: Make IQv2 query/store interface type safe
 Key: KAFKA-13541
 URL: https://issues.apache.org/jira/browse/KAFKA-13541
 Project: Kafka
  Issue Type: Sub-task
Reporter: Patrick Stuedi
Assignee: Patrick Stuedi


Currently the new IQv2 interface allows applications to query state stores 
using subclasses of the Query type. Unfortunately there is currently no way 
to check that the template type of the query matches the type of the relevant 
store the query is executed on. As a consequence stores have to do a set of 
unsafe casts.

This ticket is to explore ways to make the query interface type safe where only 
type mismatches are detected at compile time.  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13540) UniformStickyPartitioner leads to uneven Kafka partitions

2021-12-13 Thread nk2242696 (Jira)
nk2242696 created KAFKA-13540:
-

 Summary: UniformStickyPartitioner leads to uneven Kafka partitions
 Key: KAFKA-13540
 URL: https://issues.apache.org/jira/browse/KAFKA-13540
 Project: Kafka
  Issue Type: Bug
  Components: clients, producer 
Affects Versions: 2.4.1
Reporter: nk2242696
 Attachments: MicrosoftTeams-image (1).png

Kafka Topic with 20 partitions, 24 hour TTL. Replication factor of 3 . 
Using UniformStickyPartitioner expected size of each partition to be roughly of 
same size But realised size for some of the partitions is almost double .

!MicrosoftTeams-image (1).png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] satishd commented on a change in pull request #11390: [KAFKA-13369] Follower fetch protocol changes for tiered storage.

2021-12-13 Thread GitBox


satishd commented on a change in pull request #11390:
URL: https://github.com/apache/kafka/pull/11390#discussion_r767698400



##
File path: core/src/main/scala/kafka/server/AbstractFetcherThread.scala
##
@@ -715,6 +725,58 @@ abstract class AbstractFetcherThread(name: String,
 }
   }
 
+  /**
+   * Handle a partition whose offset is out of range and return a new fetch 
offset.
+   */
+  protected def fetchOffsetAndTruncate(topicPartition: TopicPartition, 
topicId: Option[Uuid], currentLeaderEpoch: Int): PartitionFetchState = {
+fetchOffsetAndApplyFun(topicPartition, topicId, currentLeaderEpoch,
+  leaderLogStartOffset => truncateFullyAndStartAt(topicPartition, 
leaderLogStartOffset))
+  }
+
+  /**
+   * Handle a partition whose offset is moved to tiered storage and return a 
new fetch offset.
+   */
+  protected def fetchOffsetAndBuildRemoteLogAuxState(topicPartition: 
TopicPartition, topicId: Option[Uuid],
+ currentLeaderEpoch: Int,
+ leaderLogStartOffset: 
Long): PartitionFetchState = {
+fetchOffsetAndApplyFun(topicPartition, topicId, currentLeaderEpoch,
+  leaderLocalLogStartOffset =>
+buildRemoteLogAuxState(topicPartition, currentLeaderEpoch, 
leaderLocalLogStartOffset, leaderLogStartOffset))
+  }
+
+  /**
+   * Handle the offset moved to tiered storage error. Return false if
+   * 1) the request succeeded or

Review comment:
   Updated the javadoc.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] rajinisivaram opened a new pull request #11597: KAFKA-13539: Improve propagation and processing of SSL handshake failures

2021-12-13 Thread GitBox


rajinisivaram opened a new pull request #11597:
URL: https://github.com/apache/kafka/pull/11597


   When server fails SSL handshake and closes its connection, we attempt to 
report this to clients on a best-effort basis. When IOException is detected in 
the client, we may proceed to close the connection before processing all the 
data from the server if we have data pending to be sent to the server. Server 
attempts to send any data that has been already wrapped, but may not wrap again 
after handshake failure, so error may not be propagated to clients. However, 
our tests assume that clients always detect  handshake failures. This PR 
attempts to wrap and send all data on the server-side after handshake failure 
and attempts to process all data on the client-side.
   
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Created] (KAFKA-13539) Improve propagation and processing of SSL handshake failures

2021-12-13 Thread Rajini Sivaram (Jira)
Rajini Sivaram created KAFKA-13539:
--

 Summary: Improve propagation and processing of SSL handshake 
failures
 Key: KAFKA-13539
 URL: https://issues.apache.org/jira/browse/KAFKA-13539
 Project: Kafka
  Issue Type: Bug
  Components: security
Affects Versions: 3.1.0
Reporter: Rajini Sivaram
Assignee: Rajini Sivaram
 Fix For: 3.2.0


{color:#172b4d}When server fails SSL handshake and closes its connection, we 
attempt to report this to clients on a best-effort basis. However, our tests 
assume that peer always detects the failure. This may not be the case when 
there are delays. It will be good to improve reliability of handshake failure 
reporting. {color}



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] richard-axual commented on pull request #11535: KAFKA-13476: Increase resilience timestamp decoding Kafka Streams

2021-12-13 Thread GitBox


richard-axual commented on pull request #11535:
URL: https://github.com/apache/kafka/pull/11535#issuecomment-992330136


   @mjsax Thanks for the heads-up. I was on vacation for a while, but I've 
replied to your question in the Jira issue


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-10888) Sticky partition leads to uneven product msg, resulting in abnormal delays in some partitions

2021-12-13 Thread nk2242696 (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10888?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458258#comment-17458258
 ] 

nk2242696 commented on KAFKA-10888:
---

[~showuon] [~hachikuji] 

I propose a solution based on keeping the  track of number of offsets/messages 
written to each partition. 
 # At partitioner level, Store the number of offsets/messages written to each 
partition. (sizePerPartitionMap) and total(total offsets for a topic)
 # Before choosing next batch to write(onNextBatch()) . Use 
(sizePerPartitionMap) to blacklisting the available partitions which causes 
skewness(USING a configurable THRESHOLD %) . Choose next partition from list of 
available whitelisted partitions. 
 # To configure sizePerPartitionMap. Use the callback method of producer.send() 
to update the sizePerPartitionMap and total .

This way, we can skip slower partitions(blacklisted) for few rounds and ensure 
all partitions are roughly of equal size. 

>  Sticky partition leads to uneven product msg, resulting in abnormal delays 
> in some partitions
> --
>
> Key: KAFKA-10888
> URL: https://issues.apache.org/jira/browse/KAFKA-10888
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, producer 
>Affects Versions: 2.4.1
>Reporter: jr
>Assignee: Luke Chen
>Priority: Major
> Attachments: image-2020-12-24-21-05-02-800.png, 
> image-2020-12-24-21-09-47-692.png, image-2020-12-24-21-10-24-407.png
>
>
>   110 producers ,550 partitions ,550 consumers , 5 nodes Kafka cluster
>   The producer uses the nullkey+stick partitioner, the total production rate 
> is about 100w tps
> Observed partition delay is abnormal and message distribution is uneven, 
> which leads to the maximum production and consumption delay of the partition 
> with more messages 
> abnormal.
>   I cannot find reason that stick will make the message distribution uneven 
> at this production rate.
>   I can't switch to the round-robin partitioner, which will increase the 
> delay and cpu cost. Is thathe stick partationer design cause uneven message 
> distribution, or this is abnormal. How to solve it?
>   !image-2020-12-24-21-09-47-692.png!
> As shown in the picture, the uneven distribution is concentrated on some 
> partitions and some brokers, there seems to be some rules.
> This problem does not only occur in one cluster, but in many high tps 
> clusters,
> The problem is more obvious on the test cluster we built.
> !image-2020-12-24-21-10-24-407.png!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Commented] (KAFKA-13476) Streams crashes when non Base64 Offset Metadata is found

2021-12-13 Thread Richard Bosch (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13476?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458255#comment-17458255
 ] 

Richard Bosch commented on KAFKA-13476:
---

Hello [~mjsax] [~guozhang] , This is triggered by a tool that can be used to 
set offsets for a consumer group to make sure that applications can start 
somewhere else than beginning or end of a topic.

Tracing data was added to metadata because previous releases of the Kafka 
Client and Streams did not use the metadata part of the OffsetAndMetadata 
structure.

> Streams crashes when non Base64 Offset Metadata is found
> 
>
> Key: KAFKA-13476
> URL: https://issues.apache.org/jira/browse/KAFKA-13476
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: Richard Bosch
>Assignee: Richard Bosch
>Priority: Minor
>   Original Estimate: 0.5h
>  Remaining Estimate: 0.5h
>
> Kafka Streams applications use the metadata stored with the committed offsets 
> from previous running instances to extract timestamps.
> But when the metadata field contains other data the Base64 decoder will throw 
> an exception causing the Streams application to fail.
> A new Offset commit is then required to stop this failure.
> I've included the part of the log when we started a Kafka Streams app after 
> setting the offsets using a third party tool. This tool adds some tracing 
> metadata so developers and operators could debug who performed this custom 
> offset commit.
>  
> {noformat}
> 2021-11-16 12:56:36.020  INFO 25 --- [-StreamThread-2] 
> o.a.k.clients.consumer.KafkaConsumer : [Consumer clientId=example-app-3, 
> groupId=axual-demo-example-example-app] Unsubscribed all topics or patterns 
> and assigned partitions
>   at java.base/java.util.Base64$Decoder.decode(Unknown Source) ~[na:na]
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.decodeTimestamp(StreamTask.java:1039)
>  ~[kafka-streams-2.7.0.jar:na]
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  ~[kafka-streams-2.7.0.jar:na]
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeTaskTime(StreamTask.java:837)
>  ~[kafka-streams-2.7.0.jar:na]
> java.lang.IllegalArgumentException: Illegal base64 character 7b
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:728)
>  ~[kafka-streams-2.7.0.jar:na]
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.initializeMetadata(StreamTask.java:818)
>  ~[kafka-streams-2.7.0.jar:na]
> 2021-11-16 12:56:36.127 ERROR 25 --- [-StreamThread-1] 
> org.apache.kafka.streams.KafkaStreams: stream-client 
> [streams-example-app-1] All stream threads have died. The instance will be in 
> error state and should be closed.
>   at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
>  ~[kafka-streams-2.7.0.jar:na]
> java.lang.IllegalArgumentException: Illegal base64 character 7b
> {noformat}
> I recommend adding a Try Catch block around the Base64 decode in the 
> StreamTask.decodeTimestamp method and return the Unknown value when this 
> occurs.
> This is pure for resilience when bad data is encountered.
> After the Streams application performs a new offset commit the error should 
> not occur again, limiting the change of frequently occurring warnings in the 
> logs
> I've already made the changes and added a test for this issue, as I would 
> like to contribute to Kafka.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] mimaison commented on pull request #11560: KAFKA-7589: Allow configuring network threads per listener

2021-12-13 Thread GitBox


mimaison commented on pull request #11560:
URL: https://github.com/apache/kafka/pull/11560#issuecomment-992306969


   @rajinisivaram @tombentley @dajac This PR is now ready for reviews. Thanks


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-9366) Upgrade log4j to log4j2

2021-12-13 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9366?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458219#comment-17458219
 ] 

Luke Chen commented on KAFKA-9366:
--

Please upgrade to Log4j to 2.15.0 or newer for CVE-2021-44228. Thanks.

> Upgrade log4j to log4j2
> ---
>
> Key: KAFKA-9366
> URL: https://issues.apache.org/jira/browse/KAFKA-9366
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.2.0, 2.1.1, 2.3.0, 2.4.0
>Reporter: leibo
>Assignee: Dongjin Lee
>Priority: Critical
>  Labels: needs-kip
> Fix For: 3.2.0
>
>
> h2. CVE-2019-17571 Detail
> Included in Log4j 1.2 is a SocketServer class that is vulnerable to 
> deserialization of untrusted data which can be exploited to remotely execute 
> arbitrary code when combined with a deserialization gadget when listening to 
> untrusted network traffic for log data. This affects Log4j versions up to 1.2 
> up to 1.2.17.
>  
> [https://cve.mitre.org/cgi-bin/cvename.cgi?name=CVE-2019-17571]
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[GitHub] [kafka] dengziming commented on pull request #11261: KAFKA-13228: ApiVersionRequest is not properly handled in KRaft

2021-12-13 Thread GitBox


dengziming commented on pull request #11261:
URL: https://github.com/apache/kafka/pull/11261#issuecomment-992240783


   Hello @mumrah , are you interested in reviewing this? currently, 
APiVersionRequest is not properly handled in KRaft server and will have a bad 
effect on KIP-778.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[GitHub] [kafka] dengziming commented on pull request #11261: KAFKA-13228: ApiVersionRequest is not properly handled in KRaft

2021-12-13 Thread GitBox


dengziming commented on pull request #11261:
URL: https://github.com/apache/kafka/pull/11261#issuecomment-992240131


   Hello @mumrah , are you interested in reviewing this, currently, 
APiVersionRequest is not properly handled in KRaft server and will have a bad 
effect on KIP-778.


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org




[jira] [Commented] (KAFKA-13535) Workaround for mitigating CVE-2021-44228 Kafka

2021-12-13 Thread Luke Chen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13535?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17458213#comment-17458213
 ] 

Luke Chen commented on KAFKA-13535:
---

[~akansh] , I think if users don't add jms appender config to log4j 
configuration file, with the *TopicBindingName* or 
*TopicConnectionFactoryBindingName* setting to bad names, it's fine. I would 
suggest Kafka team has an official announcement for this vulnerability. Thanks.

> Workaround for mitigating CVE-2021-44228 Kafka 
> ---
>
> Key: KAFKA-13535
> URL: https://issues.apache.org/jira/browse/KAFKA-13535
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.8.1
>Reporter: Akansh Shandilya
>Priority: Major
>
> Kafka v2.8.1 uses log4j v1.x . Please review following information :
>  
> Is Kafka v2.8.1 impacted by  CVE-2021-44228?
> If yes, is there any workaround/recommendation available for Kafka  v2.8.1 to 
> mitigate CVE-2021-44228



--
This message was sent by Atlassian Jira
(v8.20.1#820001)