Re: [ANNOUNCE] Apache Kafka 2.2.1
Thanks Vahid! On Mon, Jun 3, 2019, 16:23 Vahid Hashemian wrote: > The Apache Kafka community is pleased to announce the release for Apache > Kafka 2.2.1 > > This is a bugfix release for Kafka 2.2.0. All of the changes in this > release can be found in the release notes: > https://www.apache.org/dist/kafka/2.2.1/RELEASE_NOTES.html > > You can download the source and binary release from: > https://kafka.apache.org/downloads#2.2.1 > > > --- > > Apache Kafka is a distributed streaming platform with four core APIs: > > ** The Producer API allows an application to publish a stream records to > one or more Kafka topics. > > ** The Consumer API allows an application to subscribe to one or more > topics and process the stream of records produced to them. > > ** The Streams API allows an application to act as a stream processor, > consuming an input stream from one or more topics and producing an output > stream to one or more output topics, effectively transforming the input > streams to output streams. > > ** The Connector API allows building and running reusable producers or > consumers that connect Kafka topics to existing applications or data > systems. For example, a connector to a relational database might capture > every change to a table. > > With these APIs, Kafka can be used for two broad classes of application: > > ** Building real-time streaming data pipelines that reliably get data > between systems or applications. > > ** Building real-time streaming applications that transform or react to the > streams of data. > > Apache Kafka is in use at large and small companies worldwide, including > Capital One, Goldman Sachs, ING, LinkedIn, Netflix, Pinterest, Rabobank, > Target, The New York Times, Uber, Yelp, and Zalando, among others. > > A big thank you for the following 30 contributors to this release! > > Anna Povzner, Arabelle Hou, A. Sophie Blee-Goldman, Bill Bejeck, Bob > Barrett, Chris Egerton, Colin Patrick McCabe, Cyrus Vafadari, Dhruvil Shah, > Doroszlai, Attila, Guozhang Wang, huxi, Jason Gustafson, John Roesler, > Konstantine Karantasis, Kristian Aurlien, Lifei Chen, Magesh Nandakumar, > Manikumar Reddy, Massimo Siani, Matthias J. Sax, Nicholas Parker, pkleindl, > Rajini Sivaram, Randall Hauch, Sebastián Ortega, Vahid Hashemian, Victoria > Bialas, Yaroslav Klymko, Zhanxiang (Patrick) Huang > > We welcome your help and feedback. For more information on how to report > problems, and to get involved, visit the project website at > https://kafka.apache.org/ > > Thank you! > > Regards, > --Vahid Hashemian >
Re: [DISCUSS] KIP-378: Enable Dependency Injection for Kafka Streams handlers
Hi Matthias, Sorry, due to other commitments I haven't started the other implementation yet. In the meantime, the community has opted for the second, more complex solution. I already had ideas in this regard, but their elaboration needs to be discussed more. Best greetings, Wladimir On 21-Feb-19 09:33, Matthias J. Sax wrote: Hi Wladimir, what is the status of this KIP? -Matthias On 1/9/19 4:17 PM, Guozhang Wang wrote: Hello Wladimir, Just checking if you are still working on this KIP. We have the 2.2 KIP freeze deadline by 24th this month, and it'll be great to complete this KIP by then so 2.2.0 release could have this feature. Guozhang On Mon, Dec 3, 2018 at 11:26 PM Guozhang Wang wrote: Hello Wladimir, I've thought about the two options and I think I'm sold on the second option and actually I think it is better generalize it to be potentially used for other clients (producer, consumer) as while since they also have similar dependency injection requests for metrics reporter, partitioner, partition assignor etc. So I'd suggest we add the following to AbstractConfig directly (note I intentionally renamed the class to ConfiguredInstanceFactory to be used for other clients as well): ``` AbstractConfig(ConfigDef definition, Map originals, ConfiguredInstanceFactory, boolean doLog) ``` And then in StreamsConfig add: ``` StreamsConfig(Map props, ConfiguredInstanceFactory) ``` which would call the above AbstractConfig constructor (we can leave to core team to decide when they want to add for producer and consumer); And in KafkaStreams / TopologyTestDriver we can add one overloaded constructor each that includes all the parameters including the ConfiguredInstanceFactory --- for those who only want `factory` but not `client-suppliers` for example, they can set it to `null` and the streams library will just use the default one. Guozhang On Sun, Dec 2, 2018 at 12:13 PM Wladimir Schmidt wrote: Hello Guozhang, sure, the first approach is very straight-forward and allows minimal changes to the Kafka Streams API. On the other hand, second approach with the interface implementation looks more cleaner to me. I totally agree that this should be first discussed before will be implemented. Thanks, Wladimir On 17-Nov-18 23:37, Guozhang Wang wrote: Hello folks, I'd like to revive this thread for discussion. After reading the previous emails I think I'm still a bit leaning towards re-enabling to pass in StreamsConfig to Kafka Streams constructors compared with a ConfiguredStreamsFactory as additional parameters to overloaded KafkaStreams constructors: although the former seems less cleaner as it requires users to read through the usage of AbstractConfig to know how to use it in their frameworks, this to me is a solvable problem through documentations, plus AbstractConfig is a public interface already and hence the additional ConfiguredStreamsFactory to me is really a bit overlapping in functionality. Guozhang On Sun, Oct 21, 2018 at 1:41 PM Wladimir Schmidt wrote: Hi Damian, The first approach was added only because it had been initially proposed in my pull request, which started a discussion and thus, the KIP-378 was born. Yes, I would like to have something "injectable". In this regard, a `ConfiguredStreamsFactory` (name is a subject to discussion) is a good option to be introduced into `KafkaStreams` constructor. Even though, I consider the second approach to be cleaner, it involves a certain amount of refactoring of the streams library. The first approach, on the contrary, adds (or removes deprecated annotation, if the method has not been removed yet) only additional constructors with considerably less intervention into a streams library (no changes, which would break an API. Please see a pull request:https://github.com/apache/kafka/pull/5344). Thanks Wladimir On 10-Oct-18 15:51, Damian Guy wrote: Hi Wladimir, Of the two approaches in the KIP - i feel the second approach is cleaner. However, am i correct in assuming that you want to have the `ConfiguredStreamsFactory` as a ctor arg in `StreamsConfig` so that Spring can inject this for you? Otherwise you could just put the ApplicationContext as a property in the config and then use that via the configure method of the appropriate handler to get your actual handler. Thanks, Damian On Tue, 9 Oct 2018 at 01:55, Guozhang Wang wrote: John, thanks for the explanation, now it makes much more sense to me. As for the concrete approach, to me it seems the first option requires less changes than the second (ConfiguredStreamsFactory based) approach, whereas the second one requires an additional interface that is overlapping with the AbstractConfig. I'm aware that in KafkaProducer / KafkaConsumer we do not have public constructors for taking a ProducerConfig or ConsumerConfig directly, and anyone using Spring can share how you've worked around it by far? If it is very awkward I'm not against j
Re: [DISCUSS] KIP-378: Enable Dependency Injection for Kafka Streams handlers
Hello Guozhang, sure, the first approach is very straight-forward and allows minimal changes to the Kafka Streams API. On the other hand, second approach with the interface implementation looks more cleaner to me. I totally agree that this should be first discussed before will be implemented. Thanks, Wladimir On 17-Nov-18 23:37, Guozhang Wang wrote: Hello folks, I'd like to revive this thread for discussion. After reading the previous emails I think I'm still a bit leaning towards re-enabling to pass in StreamsConfig to Kafka Streams constructors compared with a ConfiguredStreamsFactory as additional parameters to overloaded KafkaStreams constructors: although the former seems less cleaner as it requires users to read through the usage of AbstractConfig to know how to use it in their frameworks, this to me is a solvable problem through documentations, plus AbstractConfig is a public interface already and hence the additional ConfiguredStreamsFactory to me is really a bit overlapping in functionality. Guozhang On Sun, Oct 21, 2018 at 1:41 PM Wladimir Schmidt wrote: Hi Damian, The first approach was added only because it had been initially proposed in my pull request, which started a discussion and thus, the KIP-378 was born. Yes, I would like to have something "injectable". In this regard, a `ConfiguredStreamsFactory` (name is a subject to discussion) is a good option to be introduced into `KafkaStreams` constructor. Even though, I consider the second approach to be cleaner, it involves a certain amount of refactoring of the streams library. The first approach, on the contrary, adds (or removes deprecated annotation, if the method has not been removed yet) only additional constructors with considerably less intervention into a streams library (no changes, which would break an API. Please see a pull request: https://github.com/apache/kafka/pull/5344). Thanks Wladimir On 10-Oct-18 15:51, Damian Guy wrote: Hi Wladimir, Of the two approaches in the KIP - i feel the second approach is cleaner. However, am i correct in assuming that you want to have the `ConfiguredStreamsFactory` as a ctor arg in `StreamsConfig` so that Spring can inject this for you? Otherwise you could just put the ApplicationContext as a property in the config and then use that via the configure method of the appropriate handler to get your actual handler. Thanks, Damian On Tue, 9 Oct 2018 at 01:55, Guozhang Wang wrote: John, thanks for the explanation, now it makes much more sense to me. As for the concrete approach, to me it seems the first option requires less changes than the second (ConfiguredStreamsFactory based) approach, whereas the second one requires an additional interface that is overlapping with the AbstractConfig. I'm aware that in KafkaProducer / KafkaConsumer we do not have public constructors for taking a ProducerConfig or ConsumerConfig directly, and anyone using Spring can share how you've worked around it by far? If it is very awkward I'm not against just adding the XXXConfigs to the constructors directly. Guozhang On Fri, Oct 5, 2018 at 1:48 PM, John Roesler wrote: Hi Wladimir, Thanks for the KIP! As I mentioned in the PR discussion, I personally prefer not to recommend overriding StreamsConfig for this purpose. It seems like a person wishing to create a DI shim would have to acquire quite a deep understanding of the class and its usage to figure out what exactly to override to accomplish their goals without breaking everything. I'm honestly impressed with the method you came up with to create your Spring/Streams shim. I think we can make to path for the next person smoother by going with something more akin to the ConfiguredStreamsFactory. This is a constrained interface that tells you exactly what you have to implement to create such a shim. A few thoughts: 1. it seems like we can keep all the deprecated constructors still deprecated 2. we could add just one additional constructor to each of KafkaStreams and TopologyTestDriver to still take a Properties, but also your new ConfiguredStreamsFactory 3. I don't know if I'm sold on the name ConfiguredStreamsFactory, since it does not produce configured streams. Instead, it produces configured instances... How about ConfiguredInstanceFactory? 4. if I understand the usage correctly, it's actually a pretty small number of classes that we actually make via getConfiguredInstance. Offhand, I can think of the key/value Serdes, the deserialization exception handler, and the production exception handler. Perhaps, instead of maintaining a generic "class instantiator", we could explore a factory interface that just has methods for creating exactly the kinds of things we need to create. In fact, we already have something like this: org.apache.kafka.streams.KafkaClientSupplier . Do you think we could just add some more methods to that interface (and maybe rename it) instead? T
Re: [DISCUSS] KIP-378: Enable Dependency Injection for Kafka Streams handlers
uctor (thus allowing the use of immutable classes). With this KIP, it would be possible for a DI user to: 1. register a Streams-Spring or Streams-Guice (etc) "plugin" (via either of the mechanisms he proposed) 2. simply make the Serdes, exception handlers, etc, available on the class path with the DI annotations 3. start the app There's no need to mess with passing dependencies (or the injector) through the properties. Sorry for "injecting" myself into your discussion, but it took me a while in the PR discussion to get to the bottom of the issue, and I wanted to spare you the same. I'll respond separately with my feedback on the KIP. Thanks, -John On Sun, Sep 30, 2018 at 2:31 PM Guozhang Wang wrote: Hello Wladimir, Thanks for proposing the KIP. I think the injection can currently be done by passing in the key/value pair directly into the properties which can then be accessed from the `ProcessorContext#appConfigs` or `#appConfigsWithPrefix`. For example, when constructing the properties you can: ``` props.put(myProp1, myValue1); props.put(myProp2, myValue1); props.put("my_app_context", appContext); KafkaStreams myApp = new KafkaStreams(topology, props); // and then in your processor, on the processor where you want to construct the injected handler: Map appProps = processorContext.appConfigs(); ApplicationContext appContext = appProps.get("my_app_context"); MyHandler myHandler = applicationContext.getBeanNamesForType(MyHandlerClassType); ``` Does that work for you? Guozhang On Sun, Sep 30, 2018 at 6:56 AM, Dongjin Lee wrote: Hi Wladimir, Thanks for your great KIP. Let me have a look. And let's discuss this KIP in depth after the release of 2.1.0. (The committers are very busy for it.) Best, Dongjin On Sun, Sep 30, 2018 at 10:49 PM Wladimir Schmidt < wlsc@gmail.com wrote: Dear colleagues, I am happy to inform you that I have just finished my first KIP (KIP-378: Enable Dependency Injection for Kafka Streams handlers < https://cwiki.apache.org/confluence/display/KAFKA/KIP- 378%3A+Enable+Dependency+Injection+for+Kafka+Streams+handlers ). Your feedback on this submission would be highly appreciated. Best Regards, Wladimir Schmidt -- *Dongjin Lee* *A hitchhiker in the mathematical world.* *github: <http://goog_969573159/>github.com/dongjinleekr <http://github.com/dongjinleekr>linkedin: kr.linkedin.com/in/dongjinleekr <http://kr.linkedin.com/in/dongjinleekr>slideshare: www.slideshare.net/dongjinleekr <http://www.slideshare.net/dongjinleekr>* -- -- Guozhang -- -- Guozhang
[jira] [Created] (KAFKA-7527) Enable Dependency Injection for Kafka Streams handlers (KIP-378)
Wladimir Schmidt created KAFKA-7527: --- Summary: Enable Dependency Injection for Kafka Streams handlers (KIP-378) Key: KAFKA-7527 URL: https://issues.apache.org/jira/browse/KAFKA-7527 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.0.0, 2.1.0 Reporter: Wladimir Schmidt Implement solution proposed in the KIP-378 (Enable Dependency Injection for Kafka Streams handlers). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[DISCUSS] KIP-378: Enable Dependency Injection for Kafka Streams handlers
Dear colleagues, I am happy to inform you that I have just finished my first KIP (KIP-378: Enable Dependency Injection for Kafka Streams handlers <https://cwiki.apache.org/confluence/display/KAFKA/KIP-378%3A+Enable+Dependency+Injection+for+Kafka+Streams+handlers>). Your feedback on this submission would be highly appreciated. Best Regards, Wladimir Schmidt
[DISCUSS] KIP-378: Enable Dependency Injection for Kafka Streams handlers
Dear colleagues, I am happy to inform you that I have just finished my first KIP (KIP-378: Enable Dependency Injection for Kafka Streams handlers <https://cwiki.apache.org/confluence/display/KAFKA/KIP-378%3A+Enable+Dependency+Injection+for+Kafka+Streams+handlers>). Your feedback on this submission would be highly appreciated. Best Regards, Wladimir Schmidt
Access for a KIP creation
Hi, I would like to request an access right for a KIP creation. Reference: https://github.com/apache/kafka/pull/5344#issuecomment-419942283 My cwiki ID is "wlsc" (on https://cwiki.apache.org/confluence/). Thanks, Wladimir Schmidt
[jira] [Created] (KAFKA-6410) Scala ReassignPartitionsClusterTest (shouldExecuteThrottledReassignment)
Wladimir Schmidt created KAFKA-6410: --- Summary: Scala ReassignPartitionsClusterTest (shouldExecuteThrottledReassignment) Key: KAFKA-6410 URL: https://issues.apache.org/jira/browse/KAFKA-6410 Project: Kafka Issue Type: Bug Reporter: Wladimir Schmidt The scala test ReassignPartitionsClusterTest (shouldExecuteThrottledReassignment) is flaky and fails sometimes on Scala 2.12 and JDK 9 (according to the Jenkins). Stacktrace java.lang.AssertionError: Expected replication to be < 1 but was 10177 at org.junit.Assert.fail(Assert.java:88) at org.junit.Assert.assertTrue(Assert.java:41) at kafka.admin.ReassignPartitionsClusterTest.shouldExecuteThrottledReassignment(ReassignPartitionsClusterTest.scala:273) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at jdk.internal.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at com.sun.proxy.$Proxy1.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:108) at jdk.internal.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:146) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:128) at org.gradle.internal.remote.internal.hub.MessageHub$Handler.run(MessageHub.java:404) at org.gradle.internal.concurrent.ExecutorPol
[jira] [Created] (KAFKA-6409) LogRecoveryTest (testHWCheckpointWithFailuresSingleLogSegment) is flaky
Wladimir Schmidt created KAFKA-6409: --- Summary: LogRecoveryTest (testHWCheckpointWithFailuresSingleLogSegment) is flaky Key: KAFKA-6409 URL: https://issues.apache.org/jira/browse/KAFKA-6409 Project: Kafka Issue Type: Bug Components: log Reporter: Wladimir Schmidt In the LogRecoveryTest the test named testHWCheckpointWithFailuresSingleLogSegment is affected and not stable. Sometimes it passes, sometimes it is not. java.lang.AssertionError: Timing out after 3 ms since a new leader that is different from 1 was not elected for partition new-topic-0, leader is Some(1) at kafka.utils.TestUtils$.fail(TestUtils.scala:351) at kafka.utils.TestUtils$.$anonfun$waitUntilLeaderIsElectedOrChanged$8(TestUtils.scala:828) at scala.Option.getOrElse(Option.scala:121) at kafka.utils.TestUtils$.waitUntilLeaderIsElectedOrChanged(TestUtils.scala:818) at kafka.server.LogRecoveryTest.testHWCheckpointWithFailuresSingleLogSegment(LogRecoveryTest.scala:152) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method) at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:50) at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12) at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:47) at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:325) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:78) at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:57) at org.junit.runners.ParentRunner$3.run(ParentRunner.java:290) at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:71) at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:288) at org.junit.runners.ParentRunner.access$000(ParentRunner.java:58) at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:268) at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26) at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27) at org.junit.runners.ParentRunner.run(ParentRunner.java:363) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.runTestClass(JUnitTestClassExecuter.java:114) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecuter.execute(JUnitTestClassExecuter.java:57) at org.gradle.api.internal.tasks.testing.junit.JUnitTestClassProcessor.processTestClass(JUnitTestClassProcessor.java:66) at org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51) at jdk.internal.reflect.GeneratedMethodAccessor9.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:32) at org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:93) at com.sun.proxy.$Proxy1.processTestClass(Unknown Source) at org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:108) at jdk.internal.reflect.GeneratedMethodAccessor8.invoke(Unknown Source) at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) at java.base/java.lang.reflect.Method.invoke(Method.java:564) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:35) at org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24) at org.gradle.internal.remote.internal.hub.MessageHubBackedObjectConnection$DispatchWrapper.dispatch(MessageHubBackedObjectConnection.java:146
[jira] [Commented] (KAFKA-4857) Use AdminClient in Kafka Streams
[ https://issues.apache.org/jira/browse/KAFKA-4857?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16046560#comment-16046560 ] Wladimir Schmidt commented on KAFKA-4857: - Will be it addressed in 0.11.0.0 release? > Use AdminClient in Kafka Streams > > > Key: KAFKA-4857 > URL: https://issues.apache.org/jira/browse/KAFKA-4857 > Project: Kafka > Issue Type: Bug > Components: streams >Reporter: Matthias J. Sax >Assignee: Sharad > Labels: needs-kip > > Streams uses {{KafkaClientSupplier}} to get > consumer/restore-consumer/producer clients. Streams also uses one more client > for admin purpose namely {{StreamsKafkaClient}} that is instantiated > "manually". > With the newly upcoming {{AdminClient}} from KIP-117, we can simplify (or > even replace {{StreamsKafkaClient}} with the new {{AdminClient}}. We > furthermore want to unify how the client in generated and extend > {{KafkaClientSupplier}} with method that return this client. > As this is a public API change, a KIP is required. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Commented] (KAFKA-5167) streams task gets stuck after re-balance due to LockException
[ https://issues.apache.org/jira/browse/KAFKA-5167?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=16000384#comment-16000384 ] Wladimir Schmidt commented on KAFKA-5167: - This happens also in version 0.10.2.1. > streams task gets stuck after re-balance due to LockException > - > > Key: KAFKA-5167 > URL: https://issues.apache.org/jira/browse/KAFKA-5167 > Project: Kafka > Issue Type: Bug > Components: streams >Affects Versions: 0.10.2.0 >Reporter: Narendra Kumar > Attachments: logs.txt > > > During rebalance processor node's close() method gets called two times once > from StreamThread.suspendTasksAndState() and once from > StreamThread.closeNonAssignedSuspendedTasks(). I have some instance filed > which I am closing in processor's close method. This instance's close method > throws some exception if I call close more than once. Because of this > exception, the Kafka streams does not attempt to close the statemanager ie. > task.closeStateManager(true) is never called. When a task moves from one > thread to another within same machine the task blocks trying to get lock on > state directory which is still held by unclosed statemanager and keep > throwing the following exception: > 2017-04-30 12:34:17 WARN StreamThread:1214 - Could not create task 0_1. Will > retry. > org.apache.kafka.streams.errors.LockException: task [0_1] Failed to lock the > state directory for task 0_1 > at > org.apache.kafka.streams.processor.internals.ProcessorStateManager.(ProcessorStateManager.java:100) > at > org.apache.kafka.streams.processor.internals.AbstractTask.(AbstractTask.java:73) > at > org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:108) > at > org.apache.kafka.streams.processor.internals.StreamThread.createStreamTask(StreamThread.java:864) > at > org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:1237) > at > org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.retryWithBackoff(StreamThread.java:1210) > at > org.apache.kafka.streams.processor.internals.StreamThread.addStreamTasks(StreamThread.java:967) > at > org.apache.kafka.streams.processor.internals.StreamThread.access$600(StreamThread.java:69) > at > org.apache.kafka.streams.processor.internals.StreamThread$1.onPartitionsAssigned(StreamThread.java:234) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:259) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:352) > at > org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:303) > at > org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:290) > at > org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1029) > at > org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:995) > at > org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:592) > at > org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:361) -- This message was sent by Atlassian JIRA (v6.3.15#6346)