[jira] [Resolved] (KAFKA-13419) sync group failed with rebalanceInProgress error might cause out-of-date ownedPartition in Cooperative protocol

2022-02-22 Thread Luke Chen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13419?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Luke Chen resolved KAFKA-13419.
---
Resolution: Fixed

> sync group failed with rebalanceInProgress error might cause out-of-date 
> ownedPartition in Cooperative protocol
> ---
>
> Key: KAFKA-13419
> URL: https://issues.apache.org/jira/browse/KAFKA-13419
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.0.0
>Reporter: Luke Chen
>Assignee: Luke Chen
>Priority: Major
> Fix For: 3.1.0
>
>
> In KAFKA-13406, we found there's user got stuck when in rebalancing with 
> cooperative sticky assignor. The reason is the "ownedPartition" is 
> out-of-date, and it failed the cooperative assignment validation.
> Investigate deeper, I found the root cause is we didn't reset generation and 
> state after sync group fail. In KAFKA-12983, we fixed the issue that the 
> onJoinPrepare is not called in resetStateAndRejoin method. And it causes the 
> ownedPartition not get cleared. But there's another case that the 
> ownedPartition will be out-of-date. Here's the example:
>  # consumer A joined and synced group successfully with generation 1
>  # New rebalance started with generation 2, consumer A joined successfully, 
> but somehow, consumer A doesn't send out sync group immediately
>  # other consumer completed sync group successfully in generation 2, except 
> consumer A.
>  # After consumer A send out sync group, the new rebalance start, with 
> generation 3. So consumer A got REBALANCE_IN_PROGRESS error with sync group 
> response
>  # When receiving REBALANCE_IN_PROGRESS, we re-join the group, with 
> generation 3, with the assignment (ownedPartition) in generation 1.
>  # So, now, we have out-of-date ownedPartition sent, with unexpected results 
> happened
>  
> We might want to do *resetStateAndRejoin* when *RebalanceInProgressException* 
> errors happend in *sync group*. Because when we got sync group error, it 
> means, join group passed, and other consumers (and the leader) might already 
> completed this round of rebalance. The assignment distribution this consumer 
> have is already out-of-date.
>  



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Jenkins build is back to normal : Kafka » Kafka Branch Builder » 3.0 #176

2022-02-22 Thread Apache Jenkins Server
See 




Jenkins build is still unstable: Kafka » Kafka Branch Builder » 3.1 #79

2022-02-22 Thread Apache Jenkins Server
See 




[jira] [Created] (KAFKA-13688) Incorrect metrics in KafkaController for replicasToDeleteCount and ineligibleReplicasToDeleteCount

2022-02-22 Thread Crispin Bernier (Jira)
Crispin Bernier created KAFKA-13688:
---

 Summary: Incorrect metrics in KafkaController for 
replicasToDeleteCount and ineligibleReplicasToDeleteCount
 Key: KAFKA-13688
 URL: https://issues.apache.org/jira/browse/KAFKA-13688
 Project: Kafka
  Issue Type: Bug
  Components: controller
Reporter: Crispin Bernier


{code:java}
replicasToDeleteCount = if (!isActive) 0 else 
controllerContext.topicsToBeDeleted.map { topic =>  // For each enqueued 
topic, count the number of replicas that are not yet deleted  
controllerContext.replicasForTopic(topic).count { replica =>
controllerContext.replicaState(replica) != ReplicaDeletionSuccessful  }
}.sum

ineligibleReplicasToDeleteCount = if (!isActive) 0 else 
controllerContext.topicsToBeDeleted.map { topic =>  // For each enqueued 
topic, count the number of replicas that are ineligible  
controllerContext.replicasForTopic(topic).count { replica =>
controllerContext.replicaState(replica) == ReplicaDeletionIneligible  }
}.sum {code}
Duplicate replica counts will get ignored in the total sum when the code above 
converts to a set.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13687) Limit number of batches when using kafka-dump-log.sh

2022-02-22 Thread Sergio Troiano (Jira)
Sergio Troiano created KAFKA-13687:
--

 Summary: Limit number of batches when using kafka-dump-log.sh
 Key: KAFKA-13687
 URL: https://issues.apache.org/jira/browse/KAFKA-13687
 Project: Kafka
  Issue Type: Improvement
Reporter: Sergio Troiano


Currently the kafka-dump-log.sh reads the whole files(s) and dumps the results 
of the segment file(s).

As we know the savings the combination of using compression and batching while 
producing (if the payloads are good candidates) are huge. 

 

We would like to have a way to "monitor" the way the producers produce the 
batches as we not always we can have access to producer metrics.

We have multitenant producers so it is hard to "detect" when the usage is not 
the best.

 

The problem with the current way the DumpLogs works is it reads the whole file, 
in an scenario of having thousands of topics with different segment sizes 
(default is 1 GB) we could end up affecting the cluster balance as we are 
removing useful page from the page cache and adding what we read from files.

 

As we only need to take a few samples from the segments to see the pattern of 
the usage while producing we would like to add a new parameter called 
maxBatches.

 

Based on the current script the change is quite small as it only needs a 
parameter and a counter.

 

After adding this change for example we could take smaller samples and analyze 
the batches headers (searching for compresscodec and the batch count)

 

Doing this we could automate a tool to read all the topics and even going 
further we could take the payloads of those samples when we see the client is 
neither using compression nor batching and simulate a compression of the 
payloads (using batching and compression) then with those numbers we can reach 
the client for the proposal of saving money. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13686) NullPointerException in Selector.pollSelectionKeys

2022-02-22 Thread John Calcote (Jira)
John Calcote created KAFKA-13686:


 Summary: NullPointerException in Selector.pollSelectionKeys
 Key: KAFKA-13686
 URL: https://issues.apache.org/jira/browse/KAFKA-13686
 Project: Kafka
  Issue Type: Bug
  Components: clients
Affects Versions: 0.11.0.1
 Environment: Linux/Centos 7.6, Java JVM - 1.8.0_181
Reporter: John Calcote


Seeing repeated NPEs in Selector.pollSelectionKeys:

{noformat}
java.lang.NullPointerException: null
at 
org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:353) 
~[kafka-clients-0.11.0.1.jar:?]
at org.apache.kafka.common.network.Selector.poll(Selector.java:326) 
~[kafka-clients-0.11.0.1.jar:?]
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:433) 
~[kafka-clients-0.11.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
 ~[kafka-clients-0.11.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208)
 ~[kafka-clients-0.11.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:184)
 ~[kafka-clients-0.11.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:217)
 ~[kafka-clients-0.11.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:203)
 ~[kafka-clients-0.11.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286)
 ~[kafka-clients-0.11.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078)
 ~[kafka-clients-0.11.0.1.jar:?]
at 
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) 
~[kafka-clients-0.11.0.1.jar:?]
{noformat}

{noformat}
void pollSelectionKeys(Iterable selectionKeys,
   boolean isImmediatelyConnected,
   long currentTimeNanos) {
Iterator iterator = selectionKeys.iterator();
while (iterator.hasNext()) {
SelectionKey key = iterator.next();
iterator.remove();
KafkaChannel channel = channel(key);
long channelStartTimeNanos = recordTimePerConnection ? 
time.nanoseconds() : 0;

// register all per-connection metrics at once
sensors.maybeRegisterConnectionMetrics(channel.id());  < 
NPE HERE
if (idleExpiryManager != null)
idleExpiryManager.update(channel.id(), currentTimeNanos);

try {
...
{noformat}

The only two possibilities are:
# sensors is null
# channel is null

sensors is not likely in my mind - it's more likely that the attachment on the 
key is null, but I don't have enough background in kafka client code to 
determine why this might be. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


Build failed in Jenkins: Kafka » Kafka Branch Builder » trunk #711

2022-02-22 Thread Apache Jenkins Server
See 


Changes:


--
[...truncated 442668 lines...]
[2022-02-22T18:28:17.498Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerInner[caching enabled = false] STARTED
[2022-02-22T18:28:40.110Z] 
[2022-02-22T18:28:40.110Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerInner[caching enabled = false] PASSED
[2022-02-22T18:28:40.110Z] 
[2022-02-22T18:28:40.110Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerOuter[caching enabled = false] STARTED
[2022-02-22T18:28:54.279Z] 
[2022-02-22T18:28:54.279Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerOuter[caching enabled = false] PASSED
[2022-02-22T18:28:54.279Z] 
[2022-02-22T18:28:54.279Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerLeft[caching enabled = false] STARTED
[2022-02-22T18:29:14.048Z] 
[2022-02-22T18:29:14.048Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testInnerLeft[caching enabled = false] PASSED
[2022-02-22T18:29:14.048Z] 
[2022-02-22T18:29:14.048Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = false] STARTED
[2022-02-22T18:29:28.265Z] 
[2022-02-22T18:29:28.265Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterInner[caching enabled = false] PASSED
[2022-02-22T18:29:28.265Z] 
[2022-02-22T18:29:28.265Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterOuter[caching enabled = false] STARTED
[2022-02-22T18:29:53.312Z] 
[2022-02-22T18:29:53.312Z] 
org.apache.kafka.streams.integration.TableTableJoinIntegrationTest > 
testOuterOuter[caching enabled = false] PASSED
[2022-02-22T18:29:53.312Z] 
[2022-02-22T18:29:53.312Z] 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest > 
shouldReportCorrectEndOffsetInformation STARTED
[2022-02-22T18:29:53.312Z] 
[2022-02-22T18:29:53.312Z] 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest > 
shouldReportCorrectEndOffsetInformation PASSED
[2022-02-22T18:29:53.312Z] 
[2022-02-22T18:29:53.312Z] 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest > 
shouldReportCorrectCommittedOffsetInformation STARTED
[2022-02-22T18:29:57.838Z] 
[2022-02-22T18:29:57.838Z] 
org.apache.kafka.streams.integration.TaskMetadataIntegrationTest > 
shouldReportCorrectCommittedOffsetInformation PASSED
[2022-02-22T18:29:59.889Z] 
[2022-02-22T18:29:59.889Z] 
org.apache.kafka.streams.processor.internals.HandlingSourceTopicDeletionIntegrationTest
 > shouldThrowErrorAfterSourceTopicDeleted STARTED
[2022-02-22T18:30:08.520Z] 
[2022-02-22T18:30:08.520Z] 
org.apache.kafka.streams.processor.internals.HandlingSourceTopicDeletionIntegrationTest
 > shouldThrowErrorAfterSourceTopicDeleted PASSED
[2022-02-22T18:30:13.061Z] streams-5: SMOKE-TEST-CLIENT-CLOSED
[2022-02-22T18:30:13.061Z] streams-2: SMOKE-TEST-CLIENT-CLOSED
[2022-02-22T18:30:13.061Z] streams-3: SMOKE-TEST-CLIENT-CLOSED
[2022-02-22T18:30:13.061Z] streams-4: SMOKE-TEST-CLIENT-CLOSED
[2022-02-22T18:30:13.061Z] streams-1: SMOKE-TEST-CLIENT-CLOSED
[2022-02-22T18:30:13.061Z] streams-0: SMOKE-TEST-CLIENT-CLOSED
[2022-02-22T18:30:19.512Z] 
[2022-02-22T18:30:19.512Z] Deprecated Gradle features were used in this build, 
making it incompatible with Gradle 8.0.
[2022-02-22T18:30:19.512Z] 
[2022-02-22T18:30:19.512Z] You can use '--warning-mode all' to show the 
individual deprecation warnings and determine if they come from your own 
scripts or plugins.
[2022-02-22T18:30:19.512Z] 
[2022-02-22T18:30:19.512Z] See 
https://docs.gradle.org/7.3.3/userguide/command_line_interface.html#sec:command_line_warnings
[2022-02-22T18:30:19.512Z] 
[2022-02-22T18:30:19.512Z] BUILD SUCCESSFUL in 2h 1m 43s
[2022-02-22T18:30:19.512Z] 208 actionable tasks: 113 executed, 95 up-to-date
[2022-02-22T18:30:19.512Z] 
[2022-02-22T18:30:19.512Z] See the profiling report at: 
file:///home/jenkins/jenkins-agent/workspace/Kafka_kafka_trunk/build/reports/profile/profile-2022-02-22-16-28-42.html
[2022-02-22T18:30:19.512Z] A fine-grained performance profile is available: use 
the --scan option.
[Pipeline] junit
[2022-02-22T18:30:20.558Z] Recording test results
[2022-02-22T18:30:34.775Z] [Checks API] No suitable checks publisher found.
[Pipeline] echo
[2022-02-22T18:30:34.777Z] Verify that Kafka Streams archetype compiles
[Pipeline] sh
[2022-02-22T18:30:37.776Z] + ./gradlew streams:publishToMavenLocal 
clients:publishToMavenLocal connect:json:publishToMavenLocal 
connect:api:publishToMavenLocal
[2022-02-22T18:30:38.728Z] To honour the JVM settings for this build a 
single-use Daemon process will be forked. See 
https://docs.gradle.org/7.3.3/userguide/gradle_daemon.html#sec:disabling_the_daemon.
[2022-02-22T18:30:40.770Z] Daemon will be stopped at the end of the build 

Jenkins build is still unstable: Kafka » Kafka Branch Builder » trunk #710

2022-02-22 Thread Apache Jenkins Server
See 




KAFKA- UNDER_REPLICATED_PARTIONS - AUTOMATE

2022-02-22 Thread Kafka Life
Dear Kafka Experts

Does anyone have a dynamically generated Json file based on the Under
replicated partition in the kafka cluster.
Everytime when the URP is increased to over 500 , it is a tedious job to
manually create a Json file .

I request you to share any such dynamically generated script /json file.

Thanks in advance.

>


[jira] [Created] (KAFKA-13685) Seems to get empty event time in some messages.

2022-02-22 Thread Alex (Jira)
Alex created KAFKA-13685:


 Summary: Seems to get empty event time in some messages.
 Key: KAFKA-13685
 URL: https://issues.apache.org/jira/browse/KAFKA-13685
 Project: Kafka
  Issue Type: Bug
  Components: consumer, producer 
Reporter: Alex


I get these errors in different services occasionally:

Failed to commit stream task 0_7 due to the following error:: 
org.apache.kafka.streams.errors.StreamsException: task [0_7] Abort sending 
since an error caught with a previous record (key 
\x00\x00\x00\x00\x00\x00\x031\x00\x00\x01\x7F\x0FG8\x00 value [B@6b718659 
timestamp null) to topic 
snapshot-server-KTABLE-SUPPRESS-STATE-STORE-03-changelog due to 
org.apache.kafka.common.errors.TimeoutException: Expiring 1 record(s) for 
snapshot-server-KTABLE-SUPPRESS-STATE-STORE-03-changelog-7:120001 ms 
has passed since batch creation

 

It's only info i get. 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13684) KStream rebalance can lead to JVM process crash when network issues occure

2022-02-22 Thread Peter Cipov (Jira)
Peter Cipov created KAFKA-13684:
---

 Summary: KStream rebalance can lead to JVM process crash when 
network issues occure
 Key: KAFKA-13684
 URL: https://issues.apache.org/jira/browse/KAFKA-13684
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.8.1
Reporter: Peter Cipov
 Attachments: crash-dump.log, crash-logs.csv

Hello,

Sporadically KStream rebalance leads to segmentation fault
{code:java}
siginfo: si_signo: 11 (SIGSEGV), si_code: 128 (SI_KERNEL), si_addr: 
0x {code}
I have spotted it occuring when:

1) there some intermittent connection issues. I have found 
org.apache.kafka.common.errors.DisconnectException:  in logs during rebalance

2) a lot of partitions are shifted due to ks cluster re-balance

 

crash stack:
{code:java}
Current thread (0x7f5bf407a000):  JavaThread "app-blue-v6-StreamThread-2" 
[_thread_in_native, id=231, stack(0x7f5bdc2ed000,0x7f5bdc3ee000)]
Stack: [0x7f5bdc2ed000,0x7f5bdc3ee000],  sp=0x7f5bdc3ebe30,  free 
space=1019kNative frames: (J=compiled Java code, A=aot compiled Java code, 
j=interpreted, Vv=VM code, C=native code)C  [libc.so.6+0x37ab7]  abort+0x297
Java frames: (J=compiled Java code, j=interpreted, Vv=VM code)J 8080  
org.rocksdb.WriteBatch.put(J[BI[BIJ)V (0 bytes) @ 0x7f5c857ca520 
[0x7f5c857ca4a0+0x0080]J 8835 c2 
org.apache.kafka.streams.state.internals.RocksDBStore$SingleColumnFamilyAccessor.prepareBatchForRestore(Ljava/util/Collection;Lorg/rocksdb/WriteBatch;)V
 (52 bytes) @ 0x7f5c858dccb4 [0x7f5c858dcb60+0x0154]J 9779 
c1 
org.apache.kafka.streams.state.internals.RocksDBStore$RocksDBBatchingRestoreCallback.restoreAll(Ljava/util/Collection;)V
 (147 bytes) @ 0x7f5c7ef7b7e4 [0x7f5c7ef7b360+0x0484]J 8857 
c2 
org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter.lambda$adapt$0(Lorg/apache/kafka/streams/processor/StateRestoreCallback;Ljava/util/Collection;)V
 (73 bytes) @ 0x7f5c858f86dc [0x7f5c858f8500+0x01dc]J 9686 
c1 
org.apache.kafka.streams.processor.internals.StateRestoreCallbackAdapter$$Lambda$937.restoreBatch(Ljava/util/Collection;)V
 (9 bytes) @ 0x7f5c7dff7bb4 [0x7f5c7dff7b40+0x0074]J 9683 
c1 
org.apache.kafka.streams.processor.internals.ProcessorStateManager.restore(Lorg/apache/kafka/streams/processor/internals/ProcessorStateManager$StateStoreMetadata;Ljava/util/List;)V
 (176 bytes) @ 0x7f5c7e71af4c [0x7f5c7e719740+0x180c]J 8882 
c2 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restoreChangelog(Lorg/apache/kafka/streams/processor/internals/StoreChangelogReader$ChangelogMetadata;)Z
 (334 bytes) @ 0x7f5c859052ec [0x7f5c85905140+0x01ac]J 
12689 c2 
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(Ljava/util/Map;)V
 (412 bytes) @ 0x7f5c85ce98d4 [0x7f5c85ce8420+0x14b4]J 
12688 c2 
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase()V
 (214 bytes) @ 0x7f5c85ce580c [0x7f5c85ce5540+0x02cc]J 
17654 c2 org.apache.kafka.streams.processor.internals.StreamThread.runOnce()V 
(725 bytes) @ 0x7f5c859960e8 [0x7f5c85995fa0+0x0148]j  
org.apache.kafka.streams.processor.internals.StreamThread.runLoop()Z+61j  
org.apache.kafka.streams.processor.internals.StreamThread.run()V+36v  
~StubRoutines::call_stub {code}
I attached whole java cash-dump and digest from our logs. 

It is executed on azul jdk11

KS 2.8.1

 



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13683) Streams - Transactional Producer - Transaction with key xyz went wrong with exception: Timeout expired after 60000milliseconds while awaiting InitProducerId

2022-02-22 Thread Michael Hornung (Jira)
Michael Hornung created KAFKA-13683:
---

 Summary: Streams - Transactional Producer - Transaction with key 
xyz went wrong with exception: Timeout expired after 6milliseconds while 
awaiting InitProducerId
 Key: KAFKA-13683
 URL: https://issues.apache.org/jira/browse/KAFKA-13683
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.0.0, 2.7.0, 2.6.0
Reporter: Michael Hornung
 Fix For: 3.0.0, 2.7.0, 2.6.0
 Attachments: AkkaHttpRestServer.scala, timeoutException.png

We have an urgent issue with our customer using kafka transactional producer 
with kafka cluster with 3 or more nodes. We are using confluent on azure.

We this exception regularly: "Transaction with key XYZ went wrong with 
exception: Timeout expired after 6milliseconds while awaiting 
InitProducerId" (see attachment)

We assume that the cause is a node which is down and the producer still sends 
messages to the “down node”. 



We are using kafa streams 3.0.

*We expect that if a node is down kafka producer is intelligent enough to not 
send messages to this node any more.*

*What’s the solution of this issue? Is there any config we have to set?*

*This request is urgent because our costumer will soon have production issues.*

*Additional information*
 * send record --> see attachment “AkkaHttpRestServer.scala” – line 100
 * producer config --> see attachment “AkkaHttpRestServer.scala” – line 126



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13682) Implement auto preferred leader electino in KRaft Controller

2022-02-22 Thread Jose Armando Garcia Sancio (Jira)
Jose Armando Garcia Sancio created KAFKA-13682:
--

 Summary: Implement auto preferred leader electino in KRaft 
Controller
 Key: KAFKA-13682
 URL: https://issues.apache.org/jira/browse/KAFKA-13682
 Project: Kafka
  Issue Type: Task
  Components: kraft
Reporter: Jose Armando Garcia Sancio
Assignee: Jose Armando Garcia Sancio






--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13681) Event duplicates for partition-stuck kafka-stream application

2022-02-22 Thread Mikhail Dubrovin (Jira)
Mikhail Dubrovin created KAFKA-13681:


 Summary: Event duplicates for partition-stuck kafka-stream 
application
 Key: KAFKA-13681
 URL: https://issues.apache.org/jira/browse/KAFKA-13681
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 2.8.1
Reporter: Mikhail Dubrovin
 Attachments: fail_topology.txt

Hello,

We found the following unpredictable behavior of Kafka streams:
{code:java}
public void buildStreams(final BuilderHelper builder) {        
KTable table = builder.table();        
KTable> workflowTable = 
workflowTable(builder);        
table
                .mapValues(value -> mappers.mainDTO(value))
                .leftJoin(workflowTable, mappers::joinWorkflows)
                .toStream()
                .map((key, value) -> KeyValue.pair(
                        AggregateId.newBuilder().setId(value.getId()).build(),
                        mappers.aggregateDTO(value)))
                .peek((k, v) -> logSinkRecord(v))
                .filter((id, dto) -> !isReprocessing)
                .to(...);
    }    

private static KTable> 
workflowTable(BuilderHelper builderHelper) {
            return builderHelper.workflowTable()
                    .groupBy((id, workflow) -> KeyValue.pair(
                            
TableId.newBuilder().setId(workflow.getTableId()).build(),
                            mappers.mapWorkflow(workflow)),
                            Grouped.with(...))
                    .aggregate(ArrayList::new, (key, value, agg) -> {
                        agg.add(value);
                        return agg;
                    }, (key, value, agg) -> {
                        agg.remove(value);
                        return agg;
                    }, Materialized.with(...));
        } {code}
it is a small part of our topology but it shows the error flow.

*Data structure:*

We have two many-partition topics: entity and workflow. Every topic is 
represented as KTable.

*Data error that causes application shutdown:*

Our final event(join the entity and workflow ktables) expects a not-null field 
in the entity but for some reason, it comes for one event. The whole aggregator 
fails in _mappers.aggregateDTO(value)_ of the _buildStreams_ method 

We have a health check which restarts the aggregator if it fails.

When incorrect data comes to one partition, the partition processing is stuck 
but other partitions are processed.

It causes that at every restart, _workflowTable_ topology repeats .aggregate() 
add/remove flows and puts new List into the repartition topic. But offsets are 
not moved for processed partitions due to the aggregator's shutdown.

_This behavior generates/sinks a lot of final entity duplicates at every 
restart because the flow is successful for data from a not-corrupted partition 
but offsets are not moved for them._ 

And it also causes troubles if @EqualsAndHashCode is defined to use all fields 
to compare. At every restart, the topology tries to remove the old value(not 
existing after the first run) and adds a new value at the end of the list. The 
list grows after each restart(contains the same - new value values).

I also attached the topology description. To visualize: 
[https://zz85.github.io/kafka-streams-viz/]

*Current workaround:*

To redefine @EqualsAndHashCode to use entities' ids only.

*Not solved issue:*

Sink events duplication at every restart.

Thank you in advance!



--
This message was sent by Atlassian Jira
(v8.20.1#820001)


[jira] [Created] (KAFKA-13680) Kafka Streams application remains in RUNNING state although all stream threads shut down

2022-02-22 Thread Denis Washington (Jira)
Denis Washington created KAFKA-13680:


 Summary: Kafka Streams application remains in RUNNING state 
although all stream threads shut down
 Key: KAFKA-13680
 URL: https://issues.apache.org/jira/browse/KAFKA-13680
 Project: Kafka
  Issue Type: Bug
  Components: streams
Affects Versions: 3.0.0
Reporter: Denis Washington


We have a Kafka Streams application that is configured with 
{{LogAndFailExceptionHandler}} as deserialization error handler. In the Kafka 
Streams version we used previously (2.7), a deserialization error that causes 
all stream threads to shut down would ultimately move the application to the 
ERROR state.

However, after updating to Kafka Streams 3.0.0, we see a different behavior: 
the stream threads still shut down, but the Kafka Streams application stays in 
the RUNNING state. It  thus gets into a "zombie" state not detected by our 
monitoring.

It may be worth noting that this application has global state stores, and that 
the global stream thread was not affected by the deserialization error where we 
noticed the problem.



--
This message was sent by Atlassian Jira
(v8.20.1#820001)