[jira] [Commented] (KAFKA-9324) Drop support for Scala 2.11 (KIP-531)

2019-12-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9324?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17001827#comment-17001827
 ] 

ASF GitHub Bot commented on KAFKA-9324:
---

ijuma commented on pull request #7859: KAFKA-9324: Drop support for Scala 2.11 
(KIP-531)
URL: https://github.com/apache/kafka/pull/7859
 
 
   ### Committer Checklist (excluded from commit message)
   - [ ] Verify design and implementation 
   - [ ] Verify test coverage and CI build status
   - [ ] Verify documentation (including upgrade notes)
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Drop support for Scala 2.11 (KIP-531)
> -
>
> Key: KAFKA-9324
> URL: https://issues.apache.org/jira/browse/KAFKA-9324
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ismael Juma
>Assignee: Ismael Juma
>Priority: Major
>  Labels: kip
> Fix For: 2.5.0
>
>
> See 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-531%3A+Drop+support+for+Scala+2.11+in+Kafka+2.5
>  for details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-7267) KafkaStreams Scala DSL process method should accept a ProcessorSupplier

2019-12-21 Thread Ismael Juma (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-7267?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17001825#comment-17001825
 ] 

Ismael Juma commented on KAFKA-7267:


For what it's worth, we're planning to drop support for Scala 2.11 (KAFKA-9324).

> KafkaStreams Scala DSL process method should accept a ProcessorSupplier
> ---
>
> Key: KAFKA-7267
> URL: https://issues.apache.org/jira/browse/KAFKA-7267
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.0.0
>Reporter: John Roesler
>Priority: Minor
>  Labels: scala
>
> scala.KafkaStreams#process currently expects a ()=>Processor, which is 
> semantically equivalent to a ProcessorSupplier, but it's the only such method 
> to do so.
> All the similar methods in the Scala DSL take a Supplier like their Java 
> counterparts.
> Note that on Scala 2.12+, SAM conversion allows callers to pass either a 
> function or a supplier when the parameter is a ProcessorSupplier. (But if the 
> parameter is a function, you must pass a function)
> But on scala 2.11-, callers will have to pass a function if the parameter is 
> a function and a supplier if the parameter is a supplier. This means that 
> currently, 2.11 users are confronted with an api that demands they construct 
> suppliers for all the methods *except* process, which demands a function.
> Mitigating factor: we have some implicits available to convert a Function0 to 
> a supplier, and we could add an implicit from ProcessorSupplier to Function0 
> to smooth over the API.
>  
> What to do about it?
> We could just change the existing method to take a ProcessorSupplier instead.
>  * 2.12+ users would not notice a difference during compilation, as SAM 
> conversion would kick in. However, if they just swap in the new jar without 
> recompiling, I think they'd get a MethodDefNotFound error.
>  * 2.11- users would not be able to compile their existing code. They'd have 
> to swap their function out for a ProcessorSupplier or pull the implicit 
> conversion into scope.
>  * Note that we can delay this action until we drop 2.11 support, and we 
> would break no one.
> We could deprecate the existing method and add a new one taking a 
> ProcessorSupplier.
>  * All scala users would be able to compile their existing code and also swap 
> in the new version at runtime.
>  * Anyone explicitly passing a function would get a deprecation warning, 
> though, regardless of SAM conversion or implicit conversion, since neither 
> conversion won't kick in if there's actually a method overload expecting a 
> function. This would drive everyone to explicitly create a supplier 
> (unnecessarily)
> We could leave the existing method without deprecating it and add a new one 
> taking a ProcessorSupplier.
>  * All scala users would be able to compile their existing code and also swap 
> in the new version at runtime.
>  * There would be no unfortunate deprecation warnings.
>  * The interface would list two process methods, which is untidy.
>  * Once we drop 2.11 support, we would just drop the function variant.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-9324) Drop support for Scala 2.11 (KIP-531)

2019-12-21 Thread Ismael Juma (Jira)
Ismael Juma created KAFKA-9324:
--

 Summary: Drop support for Scala 2.11 (KIP-531)
 Key: KAFKA-9324
 URL: https://issues.apache.org/jira/browse/KAFKA-9324
 Project: Kafka
  Issue Type: Improvement
Reporter: Ismael Juma
Assignee: Ismael Juma
 Fix For: 2.5.0


See 
https://cwiki.apache.org/confluence/display/KAFKA/KIP-531%3A+Drop+support+for+Scala+2.11+in+Kafka+2.5
 for details.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-9011) Add KStream#flatTransform and KStream#flatTransformValues to Scala API

2019-12-21 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9011?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-9011:

Fix Version/s: 2.4.1

> Add KStream#flatTransform and KStream#flatTransformValues to Scala API
> --
>
> Key: KAFKA-9011
> URL: https://issues.apache.org/jira/browse/KAFKA-9011
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Alex Kokachev
>Assignee: Alex Kokachev
>Priority: Major
>  Labels: scala, streams
> Fix For: 2.5.0, 2.4.1
>
>
> Part of KIP-313: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-313%3A+Add+KStream.flatTransform+and+KStream.flatTransformValues]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9011) Add KStream#flatTransform and KStream#flatTransformValues to Scala API

2019-12-21 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17001799#comment-17001799
 ] 

John Roesler commented on KAFKA-9011:
-

Hey [~akokachev], I cherry-picked them to 2.4, but on 2.3 we get some build 
failures in the scala test suite.

If you really want them on 2.3, can you send a PR against the 2.3 branch, 
porting the PRs and fixing the build?

Thanks,
-John

Failures:
{noformat}
> Task :streams:streams-scala:compileTestScala FAILED
Pruning sources from previous analysis, due to incompatible CompileSetup.
/home/confluent/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala:193:
 not found: value Instant
val now = Instant.now()
  ^
/home/confluent/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala:195:
 value createInput is not a member of 
org.apache.kafka.streams.TopologyTestDriver
val testInput = testDriver.createInput[String, String](sourceTopic)
   ^
/home/confluent/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala:196:
 value createOutput is not a member of 
org.apache.kafka.streams.TopologyTestDriver
val testOutput = testDriver.createOutput[String, String](sinkTopic)
^
/home/confluent/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala:228:
 not found: value Instant
val now = Instant.now()
  ^
/home/confluent/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala:230:
 value createInput is not a member of 
org.apache.kafka.streams.TopologyTestDriver
val testInput = testDriver.createInput[String, String](sourceTopic)
   ^
/home/confluent/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala:231:
 value createOutput is not a member of 
org.apache.kafka.streams.TopologyTestDriver
val testOutput = testDriver.createOutput[String, String](sinkTopic)
^
/home/confluent/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala:263:
 not found: value Instant
val now = Instant.now()
  ^
/home/confluent/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala:265:
 value createInput is not a member of 
org.apache.kafka.streams.TopologyTestDriver
val testInput = testDriver.createInput[String, String](sourceTopic)
   ^
/home/confluent/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala:266:
 value createOutput is not a member of 
org.apache.kafka.streams.TopologyTestDriver
val testOutput = testDriver.createOutput[String, String](sinkTopic)
^
/home/confluent/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala:296:
 not found: value Instant
val now = Instant.now()
  ^
/home/confluent/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala:298:
 value createInput is not a member of 
org.apache.kafka.streams.TopologyTestDriver
val testInput = testDriver.createInput[String, String](sourceTopic)
   ^
/home/confluent/kafka/streams/streams-scala/src/test/scala/org/apache/kafka/streams/scala/kstream/KStreamTest.scala:299:
 value createOutput is not a member of 
org.apache.kafka.streams.TopologyTestDriver
val testOutput = testDriver.createOutput[String, String](sinkTopic)
^
12 errors found
{noformat}

> Add KStream#flatTransform and KStream#flatTransformValues to Scala API
> --
>
> Key: KAFKA-9011
> URL: https://issues.apache.org/jira/browse/KAFKA-9011
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.3.0
>Reporter: Alex Kokachev
>Assignee: Alex Kokachev
>Priority: Major
>  Labels: scala, streams
> Fix For: 2.5.0
>
>
> Part of KIP-313: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-313%3A+Add+KStream.flatTransform+and+KStream.flatTransformValues]
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8147) Add changelog topic configuration to KTable suppress

2019-12-21 Thread John Roesler (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17001795#comment-17001795
 ] 

John Roesler commented on KAFKA-8147:
-

Note for the future. [~mjduijn] is unable to finish up this work, so anyone is 
welcome to claim the ticket and take it across the finish line.

> Add changelog topic configuration to KTable suppress
> 
>
> Key: KAFKA-8147
> URL: https://issues.apache.org/jira/browse/KAFKA-8147
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Maarten
>Priority: Minor
>  Labels: kip
>
> The streams DSL does not provide a way to configure the changelog topic 
> created by KTable.suppress.
> From the perspective of an external user this could be implemented similar to 
> the configuration of aggregate + materialized, i.e.,
> {code:java}
> changelogTopicConfigs = // Configs
> materialized = Materialized.as(..).withLoggingEnabled(changelogTopicConfigs)
> ..
> KGroupedStream.aggregate(..,materialized)
> {code}
> [KIP-446: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress|https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-8147) Add changelog topic configuration to KTable suppress

2019-12-21 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler reassigned KAFKA-8147:
---

Assignee: (was: Maarten)

> Add changelog topic configuration to KTable suppress
> 
>
> Key: KAFKA-8147
> URL: https://issues.apache.org/jira/browse/KAFKA-8147
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Maarten
>Priority: Minor
>  Labels: kip
>
> The streams DSL does not provide a way to configure the changelog topic 
> created by KTable.suppress.
> From the perspective of an external user this could be implemented similar to 
> the configuration of aggregate + materialized, i.e.,
> {code:java}
> changelogTopicConfigs = // Configs
> materialized = Materialized.as(..).withLoggingEnabled(changelogTopicConfigs)
> ..
> KGroupedStream.aggregate(..,materialized)
> {code}
> [KIP-446: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress|https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Resolved] (KAFKA-9310) StreamThread may die from recoverable UnknownProducerId exception

2019-12-21 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler resolved KAFKA-9310.
-
Resolution: Fixed

> StreamThread may die from recoverable UnknownProducerId exception
> -
>
> Key: KAFKA-9310
> URL: https://issues.apache.org/jira/browse/KAFKA-9310
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.4.1
>
>
> We attempted to capture and recover from UnknownProducerId exceptions in 
> KAFKA-9231 , but the exception can still be raised, wrapped in a 
> KafkaException, and kill the thread.
> For example, see the stack trace:
> {noformat}
> [2019-12-17 00:08:53,064] ERROR 
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] 
> stream-thread 
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] 
> Encountered the following unexpected Kafka exception during processing, this 
> usually indicate Streams internal errors: 
> (org.apache.kafka.streams.processor.internals.StreamThread)
>   org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_1, processor=KSTREAM-SOURCE-31, 
> topic=windowed-node-counts, partition=1, offset=354933575, 
> stacktrace=org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort 
> sending since an error caught with a previous record (timestamp 
> 1575857317197) to topic 
> stream-soak-test-windowed-node-counts-STATE-STORE-30-changelog due to 
> org.apache.kafka.common.KafkaException: Cannot perform send because at least 
> one previous transactional or idempotent request has failed with errors.
>   at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247)
>   at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
>   at 
> org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
>   at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
>   at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>   at 
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
>   at 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
>   at 
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
>   at 
> org.apache.kafka.streams.pro

[jira] [Updated] (KAFKA-9310) StreamThread may die from recoverable UnknownProducerId exception

2019-12-21 Thread John Roesler (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9310?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

John Roesler updated KAFKA-9310:

Fix Version/s: 2.4.1

> StreamThread may die from recoverable UnknownProducerId exception
> -
>
> Key: KAFKA-9310
> URL: https://issues.apache.org/jira/browse/KAFKA-9310
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
> Fix For: 2.4.1
>
>
> We attempted to capture and recover from UnknownProducerId exceptions in 
> KAFKA-9231 , but the exception can still be raised, wrapped in a 
> KafkaException, and kill the thread.
> For example, see the stack trace:
> {noformat}
> [2019-12-17 00:08:53,064] ERROR 
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] 
> stream-thread 
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] 
> Encountered the following unexpected Kafka exception during processing, this 
> usually indicate Streams internal errors: 
> (org.apache.kafka.streams.processor.internals.StreamThread)
>   org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_1, processor=KSTREAM-SOURCE-31, 
> topic=windowed-node-counts, partition=1, offset=354933575, 
> stacktrace=org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort 
> sending since an error caught with a previous record (timestamp 
> 1575857317197) to topic 
> stream-soak-test-windowed-node-counts-STATE-STORE-30-changelog due to 
> org.apache.kafka.common.KafkaException: Cannot perform send because at least 
> one previous transactional or idempotent request has failed with errors.
>   at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247)
>   at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
>   at 
> org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
>   at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
>   at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>   at 
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
>   at 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
>   at 
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)
>   at 
> org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:87)
>   at 
> org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:429)
>   at 
> org.apache.kafka.streams.processor.internals.AssignedStreamsTasks.process(AssignedStreamsTasks.java:474)
>   at 
> org.apache.kafka.streams.pr

[jira] [Commented] (KAFKA-9310) StreamThread may die from recoverable UnknownProducerId exception

2019-12-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9310?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17001767#comment-17001767
 ] 

ASF GitHub Bot commented on KAFKA-9310:
---

vvcephei commented on pull request #7845: KAFKA-9310: Handle UnknownProducerId 
from RecordCollector.send
URL: https://github.com/apache/kafka/pull/7845
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> StreamThread may die from recoverable UnknownProducerId exception
> -
>
> Key: KAFKA-9310
> URL: https://issues.apache.org/jira/browse/KAFKA-9310
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.4.0
>Reporter: John Roesler
>Assignee: John Roesler
>Priority: Major
>
> We attempted to capture and recover from UnknownProducerId exceptions in 
> KAFKA-9231 , but the exception can still be raised, wrapped in a 
> KafkaException, and kill the thread.
> For example, see the stack trace:
> {noformat}
> [2019-12-17 00:08:53,064] ERROR 
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] 
> stream-thread 
> [stream-soak-test-855effa4-e99a-4568-bb22-5469f4714a76-StreamThread-3] 
> Encountered the following unexpected Kafka exception during processing, this 
> usually indicate Streams internal errors: 
> (org.apache.kafka.streams.processor.internals.StreamThread)
>   org.apache.kafka.streams.errors.StreamsException: Exception caught in 
> process. taskId=1_1, processor=KSTREAM-SOURCE-31, 
> topic=windowed-node-counts, partition=1, offset=354933575, 
> stacktrace=org.apache.kafka.streams.errors.StreamsException: task [1_1] Abort 
> sending since an error caught with a previous record (timestamp 
> 1575857317197) to topic 
> stream-soak-test-windowed-node-counts-STATE-STORE-30-changelog due to 
> org.apache.kafka.common.KafkaException: Cannot perform send because at least 
> one previous transactional or idempotent request has failed with errors.
>   at 
> org.apache.kafka.streams.processor.internals.RecordCollectorImpl.send(RecordCollectorImpl.java:247)
>   at 
> org.apache.kafka.streams.state.internals.StoreChangeLogger.logChange(StoreChangeLogger.java:69)
>   at 
> org.apache.kafka.streams.state.internals.ChangeLoggingTimestampedKeyValueBytesStore.log(ChangeLoggingTimestampedKeyValueBytesStore.java:35)
>   at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:69)
>   at 
> org.apache.kafka.streams.state.internals.ChangeLoggingKeyValueBytesStore.put(ChangeLoggingKeyValueBytesStore.java:31)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.putAndMaybeForward(CachingKeyValueStore.java:87)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.lambda$initInternal$0(CachingKeyValueStore.java:72)
>   at 
> org.apache.kafka.streams.state.internals.NamedCache.flush(NamedCache.java:151)
>   at 
> org.apache.kafka.streams.state.internals.NamedCache.evict(NamedCache.java:244)
>   at 
> org.apache.kafka.streams.state.internals.ThreadCache.maybeEvict(ThreadCache.java:240)
>   at 
> org.apache.kafka.streams.state.internals.ThreadCache.put(ThreadCache.java:150)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.getInternal(CachingKeyValueStore.java:224)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:205)
>   at 
> org.apache.kafka.streams.state.internals.CachingKeyValueStore.get(CachingKeyValueStore.java:36)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.lambda$get$2(MeteredKeyValueStore.java:242)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.measureLatency(MeteredKeyValueStore.java:356)
>   at 
> org.apache.kafka.streams.state.internals.MeteredKeyValueStore.get(MeteredKeyValueStore.java:242)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl$KeyValueStoreReadWriteDecorator.get(ProcessorContextImpl.java:465)
>   at 
> org.apache.kafka.streams.kstream.internals.KTableSource$KTableSourceProcessor.process(KTableSource.java:106)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)
>   at 
> org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImp

[jira] [Commented] (KAFKA-8147) Add changelog topic configuration to KTable suppress

2019-12-21 Thread ASF GitHub Bot (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8147?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17001658#comment-17001658
 ] 

ASF GitHub Bot commented on KAFKA-8147:
---

mjduijn commented on pull request #6593: KAFKA-8147 Add changelog topic 
configuration to KTable suppress
URL: https://github.com/apache/kafka/pull/6593
 
 
   
 

This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> Add changelog topic configuration to KTable suppress
> 
>
> Key: KAFKA-8147
> URL: https://issues.apache.org/jira/browse/KAFKA-8147
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 2.1.1
>Reporter: Maarten
>Assignee: Maarten
>Priority: Minor
>  Labels: kip
>
> The streams DSL does not provide a way to configure the changelog topic 
> created by KTable.suppress.
> From the perspective of an external user this could be implemented similar to 
> the configuration of aggregate + materialized, i.e.,
> {code:java}
> changelogTopicConfigs = // Configs
> materialized = Materialized.as(..).withLoggingEnabled(changelogTopicConfigs)
> ..
> KGroupedStream.aggregate(..,materialized)
> {code}
> [KIP-446: 
> https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress|https://cwiki.apache.org/confluence/display/KAFKA/KIP-446%3A+Add+changelog+topic+configuration+to+KTable+suppress]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)