[jira] [Closed] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index

2021-02-05 Thread Ivan Ponomarev (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Ponomarev closed KAFKA-5488.
-

PR is merged to trunk

to be included in 2.8.0 release

> KStream.branch should not return a Array of streams we have to access by 
> known index
> 
>
> Key: KAFKA-5488
> URL: https://issues.apache.org/jira/browse/KAFKA-5488
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Marcel "childNo͡.de" Trautwein
>Assignee: Ivan Ponomarev
>Priority: Major
>  Labels: kip
> Fix For: 2.8.0
>
>
> KIP-418: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream]
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Example
> {code:java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
> public static void main(String... args) {
> KStream[] branchedStreams= new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> (k, v) -> EventType::validData
> (k, v) -> true
> );
> 
> branchedStreams[0]
> .to("topicValidData");
> 
> branchedStreams[1]
> .to("topicInvalidData");
> }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition, 
> Consumer>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:java}
> new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> Branch.create(
> (k, v) -> EventType::validData,
> stream -> stream.to("topicValidData")
> ),
> Branch.create(
> (k, v) -> true,
> stream -> stream.to("topicInvalidData")
> )
> );
> {code}
> I'll go forward to evaluate some ideas:
>  [https://gitlab.com/childno.de/apache_kafka/snippets/1665655]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-12190) Failure on Windows due to an UnsupportedOperationException when StateDirectory sets file permissions

2021-01-21 Thread Ivan Ponomarev (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269923#comment-17269923
 ] 

Ivan Ponomarev commented on KAFKA-12190:


Hi [~mjsax] sure! I searched for the issue on Jira, but didn't find it, that's 
why the duplicated Jira ticket and the PR. Looking forward for the 
[~awilkinson] 's fix!

> Failure on Windows due to an UnsupportedOperationException when 
> StateDirectory sets file permissions
> 
>
> Key: KAFKA-12190
> URL: https://issues.apache.org/jira/browse/KAFKA-12190
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1, 2.7.1
>Reporter: Andy Wilkinson
>Assignee: Andy Wilkinson
>Priority: Critical
>  Labels: bug
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> There appears to be a regression in Kafka 2.6.1 due to [the 
> changes|https://github.com/apache/kafka/pull/9583] made for KAFKA-10705 that 
> causes a failure on Windows. After upgrading to 2.6.1 from 2.6.0, we're 
> seeing failures in Spring Boot's CI on Windows such as the following:
> {noformat}
> Caused by: java.lang.UnsupportedOperationException: (No message provided)
> at java.nio.file.Files.setPosixFilePermissions(Files.java:2044)
> at 
> org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:115)
> 
> at 
> org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:745)
> at 
> org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:585)
> at 
> org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:316)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356)
> 
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123)
> 
> at 
> org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:940)
> 
> at 
> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:591)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.configureContext(AbstractApplicationContextRunner.java:447)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.createAndLoadContext(AbstractApplicationContextRunner.java:423)
> 
> at 
> org.springframework.boot.test.context.assertj.AssertProviderApplicationContextInvocationHandler.getContextOrStartupFailure(AssertProviderApplicationContextInvocationHandler.java:61)
> 
> at 
> org.springframework.boot.test.context.assertj.AssertProviderApplicationContextInvocationHandler.(AssertProviderApplicationContextInvocationHandler.java:48)
> 
> at 
> org.springframework.boot.test.context.assertj.ApplicationContextAssertProvider.get(ApplicationContextAssertProvider.java:112)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.createAssertableContext(AbstractApplicationContextRunner.java:412)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.lambda$null$0(AbstractApplicationContextRunner.java:382)
> 
> at 
> org.springframework.boot.test.util.TestPropertyValues.applyToSystemProperties(TestPropertyValues.java:175)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.lambda$run$1(AbstractApplicationContextRunner.java:381)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.withContextClassLoader(AbstractApplicationContextRunner.java:392)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.run(AbstractApplicationContextRunner.java:381)
> 
> at 
> org.springframework.boot.actuate.autoconfigure.metrics.KafkaMetricsAutoConfigurationTests.whenKafkaStreamsIsEnabledAndThereIsNoMeterRegistryThenListenerCustomizationBacksOff(KafkaMetricsAutoConfigurationTests.java:92)
> {noformat}
> The same code worked without changes using Kafka 2.6.0.



--
This 

[jira] [Commented] (KAFKA-12190) Failure on Windows due to an UnsupportedOperationException when StateDirectory sets file permissions

2021-01-21 Thread Ivan Ponomarev (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12190?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269505#comment-17269505
 ] 

Ivan Ponomarev commented on KAFKA-12190:


Hello, I came across the same issue. While I'm running Kafka Streams on Linux, 
I'm developing (and debugging Kafka itself) on Windows, so I need to be able to 
run TopologyTestDriver-based tests at least. But the fix must be trivial and 
safe (see [PR9946|https://github.com/apache/kafka/pull/9946])

> Failure on Windows due to an UnsupportedOperationException when 
> StateDirectory sets file permissions
> 
>
> Key: KAFKA-12190
> URL: https://issues.apache.org/jira/browse/KAFKA-12190
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.1, 2.7.1
>Reporter: Andy Wilkinson
>Priority: Critical
>  Labels: bug
> Fix For: 2.8.0, 2.7.1, 2.6.2
>
>
> There appears to be a regression in Kafka 2.6.1 due to [the 
> changes|https://github.com/apache/kafka/pull/9583] made for KAFKA-10705 that 
> causes a failure on Windows. After upgrading to 2.6.1 from 2.6.0, we're 
> seeing failures in Spring Boot's CI on Windows such as the following:
> {noformat}
> Caused by: java.lang.UnsupportedOperationException: (No message provided)
> at java.nio.file.Files.setPosixFilePermissions(Files.java:2044)
> at 
> org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:115)
> 
> at 
> org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:745)
> at 
> org.apache.kafka.streams.KafkaStreams.(KafkaStreams.java:585)
> at 
> org.springframework.kafka.config.StreamsBuilderFactoryBean.start(StreamsBuilderFactoryBean.java:316)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:178)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:54)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:356)
> 
> at java.lang.Iterable.forEach(Iterable.java:75)
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:155)
> 
> at 
> org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:123)
> 
> at 
> org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:940)
> 
> at 
> org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:591)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.configureContext(AbstractApplicationContextRunner.java:447)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.createAndLoadContext(AbstractApplicationContextRunner.java:423)
> 
> at 
> org.springframework.boot.test.context.assertj.AssertProviderApplicationContextInvocationHandler.getContextOrStartupFailure(AssertProviderApplicationContextInvocationHandler.java:61)
> 
> at 
> org.springframework.boot.test.context.assertj.AssertProviderApplicationContextInvocationHandler.(AssertProviderApplicationContextInvocationHandler.java:48)
> 
> at 
> org.springframework.boot.test.context.assertj.ApplicationContextAssertProvider.get(ApplicationContextAssertProvider.java:112)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.createAssertableContext(AbstractApplicationContextRunner.java:412)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.lambda$null$0(AbstractApplicationContextRunner.java:382)
> 
> at 
> org.springframework.boot.test.util.TestPropertyValues.applyToSystemProperties(TestPropertyValues.java:175)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.lambda$run$1(AbstractApplicationContextRunner.java:381)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.withContextClassLoader(AbstractApplicationContextRunner.java:392)
> 
> at 
> org.springframework.boot.test.context.runner.AbstractApplicationContextRunner.run(AbstractApplicationContextRunner.java:381)
> 
> at 
> 

[jira] [Commented] (KAFKA-12230) Some Kafka TopologyTestDriver-based unit tests can't be run on Windows file system

2021-01-21 Thread Ivan Ponomarev (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-12230?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17269498#comment-17269498
 ] 

Ivan Ponomarev commented on KAFKA-12230:


Hi [~cadonna] thanks for pointing this out. I did perform the search in JIRA, 
but failed to find KAFKA-12190

> Some Kafka TopologyTestDriver-based unit tests can't be run on Windows file 
> system
> --
>
> Key: KAFKA-12230
> URL: https://issues.apache.org/jira/browse/KAFKA-12230
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ivan Ponomarev
>Assignee: Ivan Ponomarev
>Priority: Minor
>
> While developing Kafka on Windows machine, I get some false 
> TopologyTestDriver-based test failures because of 
> `Files.setPosixFilePermissions` failure with UnsupportedOperationException, 
> see e. g. stack trace below
>  
> Simply  catching UnsupportedOperationException together with IOException in 
> StateDirectory. solves this issue
>  
>  
> {noformat}
> java.lang.UnsupportedOperationException
>   at 
> java.base/java.nio.file.Files.setPosixFilePermissions(Files.java:2078)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:118)
>   at 
> org.apache.kafka.streams.TopologyTestDriver.setupTopology(TopologyTestDriver.java:431)
>   at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:335)
>   at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:306)
>   at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:265)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImplTest.shouldSupportForeignKeyTableTableJoinWithKTableFromKStream(KStreamImplTest.java:2751)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> 

[jira] [Resolved] (KAFKA-12230) Some Kafka TopologyTestDriver-based unit tests can't be run on Windows file system

2021-01-21 Thread Ivan Ponomarev (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-12230?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Ponomarev resolved KAFKA-12230.

Resolution: Duplicate

Duplicate of KAFKA-12190

> Some Kafka TopologyTestDriver-based unit tests can't be run on Windows file 
> system
> --
>
> Key: KAFKA-12230
> URL: https://issues.apache.org/jira/browse/KAFKA-12230
> Project: Kafka
>  Issue Type: Bug
>Reporter: Ivan Ponomarev
>Assignee: Ivan Ponomarev
>Priority: Minor
>
> While developing Kafka on Windows machine, I get some false 
> TopologyTestDriver-based test failures because of 
> `Files.setPosixFilePermissions` failure with UnsupportedOperationException, 
> see e. g. stack trace below
>  
> Simply  catching UnsupportedOperationException together with IOException in 
> StateDirectory. solves this issue
>  
>  
> {noformat}
> java.lang.UnsupportedOperationException
>   at 
> java.base/java.nio.file.Files.setPosixFilePermissions(Files.java:2078)
>   at 
> org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:118)
>   at 
> org.apache.kafka.streams.TopologyTestDriver.setupTopology(TopologyTestDriver.java:431)
>   at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:335)
>   at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:306)
>   at 
> org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:265)
>   at 
> org.apache.kafka.streams.kstream.internals.KStreamImplTest.shouldSupportForeignKeyTableTableJoinWithKTableFromKStream(KStreamImplTest.java:2751)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
>   at 
> org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
>   at 
> org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
>   at 
> org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
>   at 
> org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
>   at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
>   at 
> org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
>   at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
>   at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
>   at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
>   at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
>   at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
>   at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
>   at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
>   at 
> org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
>   at 
> org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
>   at 
> org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>   at 
> java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>   at 
> java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>   at java.base/java.lang.reflect.Method.invoke(Method.java:566)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
>   at 
> org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
>   at 
> 

[jira] [Created] (KAFKA-12230) Some Kafka TopologyTestDriver-based unit tests can't be run on Windows file system

2021-01-21 Thread Ivan Ponomarev (Jira)
Ivan Ponomarev created KAFKA-12230:
--

 Summary: Some Kafka TopologyTestDriver-based unit tests can't be 
run on Windows file system
 Key: KAFKA-12230
 URL: https://issues.apache.org/jira/browse/KAFKA-12230
 Project: Kafka
  Issue Type: Bug
Reporter: Ivan Ponomarev
Assignee: Ivan Ponomarev


While developing Kafka on Windows machine, I get some false 
TopologyTestDriver-based test failures because of 
`Files.setPosixFilePermissions` failure with UnsupportedOperationException, see 
e. g. stack trace below

 

Simply  catching UnsupportedOperationException together with IOException in 
StateDirectory. solves this issue

 

 
{noformat}
java.lang.UnsupportedOperationException
at 
java.base/java.nio.file.Files.setPosixFilePermissions(Files.java:2078)
at 
org.apache.kafka.streams.processor.internals.StateDirectory.(StateDirectory.java:118)
at 
org.apache.kafka.streams.TopologyTestDriver.setupTopology(TopologyTestDriver.java:431)
at 
org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:335)
at 
org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:306)
at 
org.apache.kafka.streams.TopologyTestDriver.(TopologyTestDriver.java:265)
at 
org.apache.kafka.streams.kstream.internals.KStreamImplTest.shouldSupportForeignKeyTableTableJoinWithKTableFromKStream(KStreamImplTest.java:2751)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at 
org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at 
org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at 
org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at 
org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at 
org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at 
org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.runTestClass(JUnitTestClassExecutor.java:110)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:58)
at 
org.gradle.api.internal.tasks.testing.junit.JUnitTestClassExecutor.execute(JUnitTestClassExecutor.java:38)
at 
org.gradle.api.internal.tasks.testing.junit.AbstractJUnitTestClassProcessor.processTestClass(AbstractJUnitTestClassProcessor.java:62)
at 
org.gradle.api.internal.tasks.testing.SuiteTestClassProcessor.processTestClass(SuiteTestClassProcessor.java:51)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at 
java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at 
java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:36)
at 
org.gradle.internal.dispatch.ReflectionDispatch.dispatch(ReflectionDispatch.java:24)
at 
org.gradle.internal.dispatch.ContextClassLoaderDispatch.dispatch(ContextClassLoaderDispatch.java:33)
at 
org.gradle.internal.dispatch.ProxyDispatchAdapter$DispatchingInvocationHandler.invoke(ProxyDispatchAdapter.java:94)
at com.sun.proxy.$Proxy2.processTestClass(Unknown Source)
at 
org.gradle.api.internal.tasks.testing.worker.TestWorker.processTestClass(TestWorker.java:119)
at 

[jira] [Updated] (KAFKA-10369) Introduce Distinct operation in KStream

2020-08-06 Thread Ivan Ponomarev (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10369?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Ponomarev updated KAFKA-10369:
---
Description: 
Message deduplication is a common task.

One example: we might have multiple data sources each reporting its state 
periodically with a relatively high frequency, their current states should be 
stored in a database. In case the actual change of the state occurs with a 
lower frequency than it is reported, in order to reduce the number of writes to 
the database we might want to filter out duplicated messages using Kafka 
Streams.

'Distinct' operation is common in data processing, e. g.
 * Java Stream has [distinct() 
|https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#distinct--]
 operation,
 * SQL has DISTINCT keyword.

 

Hence it is natural to expect the similar functionality from Kafka Streams.

Although Kafka Streams Tutorials contains an 
[example|https://kafka-tutorials.confluent.io/finding-distinct-events/kstreams.html]
 of how distinct can be emulated , but this example is complicated: it involves 
low-level coding with local state store and a custom transformer. It might be 
much more convenient to have distinct as a first-class DSL operation.

Due to 'infinite' nature of KStream, distinct operation should be windowed, 
similar to windowed joins and aggregations for KStreams.

See 
[KIP-655|https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API]

  was:
Message deduplication is a common task.

One example: we might have multiple data sources each reporting its state 
periodically with a relatively high frequency, their current states should be 
stored in a database. In case the actual change of the state occurs with a 
lower frequency than it is reported, in order to reduce the number of writes to 
the database we might want to filter out duplicated messages using Kafka 
Streams.

'Distinct' operation is common in data processing, e. g.
 * Java Stream has [distinct() 
|https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#distinct--]
 operation,
 * SQL has DISTINCT keyword.

 

Hence it is natural to expect the similar functionality from Kafka Streams.

Although Kafka Streams Tutorials contains an 
[example|https://kafka-tutorials.confluent.io/finding-distinct-events/kstreams.html]
 of how distinct can be emulated , but this example is complicated: it involves 
low-level coding with local state store and a custom transformer. It might be 
much more convenient to have distinct as a first-class DSL operation.

Due to 'infinite' nature of KStream, distinct operation should be windowed, 
similar to windowed joins and aggregations for KStreams.


> Introduce Distinct operation in KStream
> ---
>
> Key: KAFKA-10369
> URL: https://issues.apache.org/jira/browse/KAFKA-10369
> Project: Kafka
>  Issue Type: Improvement
>Reporter: Ivan Ponomarev
>Assignee: Ivan Ponomarev
>Priority: Major
>
> Message deduplication is a common task.
> One example: we might have multiple data sources each reporting its state 
> periodically with a relatively high frequency, their current states should be 
> stored in a database. In case the actual change of the state occurs with a 
> lower frequency than it is reported, in order to reduce the number of writes 
> to the database we might want to filter out duplicated messages using Kafka 
> Streams.
> 'Distinct' operation is common in data processing, e. g.
>  * Java Stream has [distinct() 
> |https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#distinct--]
>  operation,
>  * SQL has DISTINCT keyword.
>  
> Hence it is natural to expect the similar functionality from Kafka Streams.
> Although Kafka Streams Tutorials contains an 
> [example|https://kafka-tutorials.confluent.io/finding-distinct-events/kstreams.html]
>  of how distinct can be emulated , but this example is complicated: it 
> involves low-level coding with local state store and a custom transformer. It 
> might be much more convenient to have distinct as a first-class DSL operation.
> Due to 'infinite' nature of KStream, distinct operation should be windowed, 
> similar to windowed joins and aggregations for KStreams.
> See 
> [KIP-655|https://cwiki.apache.org/confluence/display/KAFKA/KIP-655%3A+Windowed+Distinct+Operation+for+Kafka+Streams+API]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Created] (KAFKA-10369) Introduce Distinct operation in KStream

2020-08-06 Thread Ivan Ponomarev (Jira)
Ivan Ponomarev created KAFKA-10369:
--

 Summary: Introduce Distinct operation in KStream
 Key: KAFKA-10369
 URL: https://issues.apache.org/jira/browse/KAFKA-10369
 Project: Kafka
  Issue Type: Improvement
Reporter: Ivan Ponomarev
Assignee: Ivan Ponomarev


Message deduplication is a common task.

One example: we might have multiple data sources each reporting its state 
periodically with a relatively high frequency, their current states should be 
stored in a database. In case the actual change of the state occurs with a 
lower frequency than it is reported, in order to reduce the number of writes to 
the database we might want to filter out duplicated messages using Kafka 
Streams.

'Distinct' operation is common in data processing, e. g.
 * Java Stream has [distinct() 
|https://docs.oracle.com/javase/8/docs/api/java/util/stream/Stream.html#distinct--]
 operation,
 * SQL has DISTINCT keyword.

 

Hence it is natural to expect the similar functionality from Kafka Streams.

Although Kafka Streams Tutorials contains an 
[example|https://kafka-tutorials.confluent.io/finding-distinct-events/kstreams.html]
 of how distinct can be emulated , but this example is complicated: it involves 
low-level coding with local state store and a custom transformer. It might be 
much more convenient to have distinct as a first-class DSL operation.

Due to 'infinite' nature of KStream, distinct operation should be windowed, 
similar to windowed joins and aggregations for KStreams.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index

2020-07-16 Thread Ivan Ponomarev (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159164#comment-17159164
 ] 

Ivan Ponomarev edited comment on KAFKA-5488 at 7/16/20, 12:38 PM:
--

Hi everyone,
{quote}I think there are some unnecessary interfaces

I don't think the return type needs to be a Map,
{quote}
[~high.lee] , concerning your comment about the API: the current API is a 
result of the extensive discussion (you can find the link to the discussion in 
the KIP itself).  The first versions of this KIP didn't have Map return type 
and Function as a parameter, but there was a concern that all the branches will 
be in separate variable scopes, which is inconvenient in many cases. There was 
a really hard discussion with a number of ideas proposed and rejected, what we 
have now seems to be the best choice from many points of view.
{quote}Are you willing to continue working?
{quote}
Sure, since I proposed this KIP, I'm going to implement it. I've been quite 
busy recently, but I really hope that I'll be able to post a PR in one or 
maximum two weeks.

 

 


was (Author: iponomarev):
Hi everyone, 
{quote}I think there are some unnecessary interfaces

I don't think the return type needs to be a Map,
{quote}
[~high.lee] , concerning your comment about the API: the current API is a 
result of the extensive discussion (you can find the link to the discussion in 
the KIP itself).  The first versions of this KIP didn't have Map return type 
and Function as a parameter, but there was a concern that all the branches will 
be in separate variable scopes, which is inconvenient in many cases. There was 
a really hard discussion with a number of ideas proposed and rejected, what we 
have now seems to be the best choice from many points of view.
{quote}Are you willing to continue working?
{quote}
Sure, since I proposed this KIP, I'm going to implement it. I've been quite 
busy recently, but I really hope that I'll be able to post a PR from me in one 
or maximum two weeks.

 

 

> KStream.branch should not return a Array of streams we have to access by 
> known index
> 
>
> Key: KAFKA-5488
> URL: https://issues.apache.org/jira/browse/KAFKA-5488
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Marcel "childNo͡.de" Trautwein
>Assignee: Ivan Ponomarev
>Priority: Major
>  Labels: kip
>
> KIP-418: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream]
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Example
> {code:java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
> public static void main(String... args) {
> KStream[] branchedStreams= new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> (k, v) -> EventType::validData
> (k, v) -> true
> );
> 
> branchedStreams[0]
> .to("topicValidData");
> 
> branchedStreams[1]
> .to("topicInvalidData");
> }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition, 
> Consumer>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:java}
> new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> Branch.create(
> (k, v) -> EventType::validData,
> stream -> stream.to("topicValidData")
> ),
> Branch.create(
> (k, v) -> true,
> stream -> stream.to("topicInvalidData")
> )
> );
> {code}
> I'll go forward to evaluate some ideas:
>  [https://gitlab.com/childno.de/apache_kafka/snippets/1665655]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index

2020-07-16 Thread Ivan Ponomarev (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Ponomarev reassigned KAFKA-5488:
-

Assignee: Ivan Ponomarev  (was: highluck)

> KStream.branch should not return a Array of streams we have to access by 
> known index
> 
>
> Key: KAFKA-5488
> URL: https://issues.apache.org/jira/browse/KAFKA-5488
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Marcel "childNo͡.de" Trautwein
>Assignee: Ivan Ponomarev
>Priority: Major
>  Labels: kip
>
> KIP-418: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream]
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Example
> {code:java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
> public static void main(String... args) {
> KStream[] branchedStreams= new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> (k, v) -> EventType::validData
> (k, v) -> true
> );
> 
> branchedStreams[0]
> .to("topicValidData");
> 
> branchedStreams[1]
> .to("topicInvalidData");
> }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition, 
> Consumer>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:java}
> new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> Branch.create(
> (k, v) -> EventType::validData,
> stream -> stream.to("topicValidData")
> ),
> Branch.create(
> (k, v) -> true,
> stream -> stream.to("topicInvalidData")
> )
> );
> {code}
> I'll go forward to evaluate some ideas:
>  [https://gitlab.com/childno.de/apache_kafka/snippets/1665655]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index

2020-07-16 Thread Ivan Ponomarev (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=17159164#comment-17159164
 ] 

Ivan Ponomarev commented on KAFKA-5488:
---

Hi everyone, 
{quote}I think there are some unnecessary interfaces

I don't think the return type needs to be a Map,
{quote}
[~high.lee] , concerning your comment about the API: the current API is a 
result of the extensive discussion (you can find the link to the discussion in 
the KIP itself).  The first versions of this KIP didn't have Map return type 
and Function as a parameter, but there was a concern that all the branches will 
be in separate variable scopes, which is inconvenient in many cases. There was 
a really hard discussion with a number of ideas proposed and rejected, what we 
have now seems to be the best choice from many points of view.
{quote}Are you willing to continue working?
{quote}
Sure, since I proposed this KIP, I'm going to implement it. I've been quite 
busy recently, but I really hope that I'll be able to post a PR from me in one 
or maximum two weeks.

 

 

> KStream.branch should not return a Array of streams we have to access by 
> known index
> 
>
> Key: KAFKA-5488
> URL: https://issues.apache.org/jira/browse/KAFKA-5488
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Reporter: Marcel "childNo͡.de" Trautwein
>Assignee: highluck
>Priority: Major
>  Labels: kip
>
> KIP-418: 
> [https://cwiki.apache.org/confluence/display/KAFKA/KIP-418%3A+A+method-chaining+way+to+branch+KStream]
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Example
> {code:java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
> public static void main(String... args) {
> KStream[] branchedStreams= new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> (k, v) -> EventType::validData
> (k, v) -> true
> );
> 
> branchedStreams[0]
> .to("topicValidData");
> 
> branchedStreams[1]
> .to("topicInvalidData");
> }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition, 
> Consumer>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:java}
> new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> Branch.create(
> (k, v) -> EventType::validData,
> stream -> stream.to("topicValidData")
> ),
> Branch.create(
> (k, v) -> true,
> stream -> stream.to("topicInvalidData")
> )
> );
> {code}
> I'll go forward to evaluate some ideas:
>  [https://gitlab.com/childno.de/apache_kafka/snippets/1665655]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Updated] (KAFKA-5488) KStream.branch should not return a Array of streams we have to access by known index

2019-01-17 Thread Ivan Ponomarev (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-5488?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Ivan Ponomarev updated KAFKA-5488:
--
Labels: pull-request-available  (was: needs-kip)

> KStream.branch should not return a Array of streams we have to access by 
> known index
> 
>
> Key: KAFKA-5488
> URL: https://issues.apache.org/jira/browse/KAFKA-5488
> Project: Kafka
>  Issue Type: Improvement
>  Components: streams
>Affects Versions: 0.10.2.1, 0.11.0.0
>Reporter: Marcel "childNo͡.de" Trautwein
>Assignee: Ivan Ponomarev
>Priority: Major
>  Labels: pull-request-available
>
> long story short: it's a mess to get a {{KStream<>[]}} out from 
> {{KStream<>branch(Predicate<>...)}}. It breaks the fluent API and it produces 
> bad code which is not that good to maintain since you have to know the right 
> index for an unnamed branching stream.
> Example
> {code:lang=java}
> import org.apache.kafka.streams.kstream.KStreamBuilder;
> import org.apache.kafka.streams.kstream.KStream;
> public class StreamAppWithBranches {
> public static void main(String... args) {
> KStream[] branchedStreams= new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> (k, v) -> EventType::validData
> (k, v) -> true
> );
> 
> branchedStreams[0]
> .to("topicValidData");
> 
> branchedStreams[1]
> .to("topicInvalidData");
> }
> }
> {code}
> Quick idea, s.th. like {{void branch(final BranchDefinition, 
> Consumer>>... branchPredicatesAndHandlers);}} where you can write 
> branches/streams code nested where it belongs to
> so it would be possible to write code like
> {code:lang=java}
> new KStreamBuilder()
> .stream("eventTopic")
> .branch(
> Branch.create(
> (k, v) -> EventType::validData,
> stream -> stream.to("topicValidData")
> ),
> Branch.create(
> (k, v) -> true,
> stream -> stream.to("topicInvalidData")
> )
> );
> {code}
> I'll go forward to evaluate some ideas:
> https://gitlab.com/childno.de/apache_kafka/snippets/1665655



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)