[ 
https://issues.apache.org/jira/browse/KAFKA-7108?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16529726#comment-16529726
 ] 

Anna O commented on KAFKA-7108:
-------------------------------

[~mjsax] - here is the StreamsConfig:



StreamsConfig values: 
 a_pplication.id = ..._
 _application.server =_ 
 _bootstrap.servers = [...]_
 _buffered.records.per.partition = 1000_
 _cache.max.bytes.buffering = 10485760_
 _client.id =_ 
 _commit.interval.ms = 100_
 _connections.max.idle.ms = 540000_
 _default.deserialization.exception.handler = class 
org.apache.kafka.streams.errors.LogAndContinueExceptionHandler_
 _default.key.serde = class 
org.apache.kafka.common.serialization.Serdes$ByteArraySerde_
 _default.production.exception.handler = class 
com...LogAndContinueProductionExceptionHandler_
 _default.timestamp.extractor = class com...UmsEventTimestampExtractor_
 _default.value.serde = class com...JsonSerde_
 _key.serde = null_
 _metadata.max.age.ms = 300000_
 _metric.reporters = []_
 _metrics.num.samples = 2_
 _metrics.recording.level = INFO_
 _metrics.sample.window.ms = 30000_
 _num.standby.replicas = 0_
 _num.stream.threads = 8_
 _partition.grouper = class 
org.apache.kafka.streams.processor.DefaultPartitionGrouper_
 _poll.ms = 100_
 _processing.guarantee = exactly_once_
 _receive.buffer.bytes = 32768_
 _reconnect.backoff.max.ms = 1000_
 _reconnect.backoff.ms = 50_
 _replication.factor = 3_
 _request.timeout.ms = 40000_
 _retries = 240_
 _retry.backoff.ms = 500_
 _rocksdb.config.setter = null_
 _security.protocol = PLAINTEXT_
 _send.buffer.bytes = 131072_
 _state.cleanup.delay.ms = 600000_
 _state.dir = /tmp/kafka-streams_
 _timestamp.extractor = null_
 _value.serde = null_
 _windowstore.changelog.additional.retention.ms = 86400000_
 _zookeeper.connect =_

 

This is how we override the retries in the KafkaStreams config in the code: __ 

config.put(StreamsConfig.RETRIES_CONFIG, 240);

 

To your remark "_If yes, you should see a WARN log that the overwrite is 
ignored..._" - there is no such log...

 

> "Exactly-once" stream breaks production exception handler contract
> ------------------------------------------------------------------
>
>                 Key: KAFKA-7108
>                 URL: https://issues.apache.org/jira/browse/KAFKA-7108
>             Project: Kafka
>          Issue Type: Bug
>          Components: streams
>    Affects Versions: 1.1.0
>            Reporter: Anna O
>            Priority: Major
>              Labels: exactly-once
>
> I have a stream configured with "default.production.exception.handler" that 
> is supposed to log the error and continue. When I set "processing.guarantee" 
> to "exactly_once" it appeared that retryable NotEnoughReplicasException that 
> passed the production exception handler was rethrown by the 
> TransactionManager wrapped with KafkaException and terminated the stream 
> thread:
> _org.apache.kafka.common.KafkaException: Cannot execute transactional method 
> because we are in an error stateat 
> org.apache.kafka.clients.producer.internals.TransactionManager.maybeFailWithError(TransactionManager.java:784)
>  ~[kafka-clients-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.clients.producer.internals.TransactionManager.sendOffsetsToTransaction(TransactionManager.java:250)
>  ~[kafka-clients-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.clients.producer.KafkaProducer.sendOffsetsToTransaction(KafkaProducer.java:617)
>  ~[kafka-clients-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.commitOffsets(StreamTask.java:357)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.access$000(StreamTask.java:53)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask$1.run(StreamTask.java:316)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamsMetricsImpl.measureLatencyNs(StreamsMetricsImpl.java:208)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:307)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamTask.commit(StreamTask.java:297)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.AssignedTasks$1.apply(AssignedTasks.java:67)
>  ~[kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.applyToRunningTasks(AssignedTasks.java:357)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.AssignedTasks.commit(AssignedTasks.java:347)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.TaskManager.commitAll(TaskManager.java:403)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.maybeCommit(StreamThread.java:994)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:811)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:750)
>  [kafka-streams-1.1.0.jar:?]_
>  _at 
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:720)
>  [kafka-streams-1.1.0.jar:?]_
>  _Caused by: org.apache.kafka.common.errors.NotEnoughReplicasException: 
> Messages are rejected since there are fewer in-sync replicas than required._
>  



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to