[jira] [Created] (KAFKA-12540) Sub-key support to avoid unnecessary rekey operations with new key is a compound key of the original key + sub-field
Antony Stubbs created KAFKA-12540: - Summary: Sub-key support to avoid unnecessary rekey operations with new key is a compound key of the original key + sub-field Key: KAFKA-12540 URL: https://issues.apache.org/jira/browse/KAFKA-12540 Project: Kafka Issue Type: New Feature Components: streams Affects Versions: 2.8.0 Reporter: Antony Stubbs If I am, for example, wanting to aggregate by an account, and by a metric, and the input topic is keyed by account (and let’s say there’s massive amount of traffic), this will have have to rekey on account+metric, which will cause a repartition topic, then group by and aggregate. However because we know that all the metrics for an account will already exist on the same partition, we ideally don’t want to have to repartition - causing a large unneeded overhead. Ideally a new `#selectSubkey` sort of method could be introduced, which would force a compound key with the original. var subKeyStream = stream#selectSubKey(x,v->v.getField(“metric”)) <— under the hood this appends the returned key to the existing key Although this might break key->partition strategy, the topology shouldn’t be dirty at this stage still as we know we’re still co-partitioned. What can happen next in the topology may need to be restricted however. In this case we would then do a: subKeyStream.groupByKey().aggregate() Functions other than aggregate, may need a repartition still, or maybe not - not sure. Similarly described quite well in this forum here: [http://apache-flink-user-mailing-list-archive.2336050.n4.nabble.com/Re-keying-sub-keying-a-stream-without-repartitioning-td12745.html] -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10408) Calendar based windows
Antony Stubbs created KAFKA-10408: - Summary: Calendar based windows Key: KAFKA-10408 URL: https://issues.apache.org/jira/browse/KAFKA-10408 Project: Kafka Issue Type: New Feature Components: streams Affects Versions: 2.6.0 Reporter: Antony Stubbs Assignee: Bruno Cadonna A date based window, for example aggregate all payments made until each month date of the 15th, or all payments made each year until April 1st. Should handle time zones "properly", e.g. allow user to specify which time zone to base it on Example implementation of a specific aggregator, with a window implementation implicitly embedded: https://github.com/astubbs/ks-tributary/blob/denormalisation-base-cp-libs/streams-module/src/main/java/io/confluent/ps/streams/processors/YearlyAggregator.java -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-10382) MockProducer is not ThreadSafe, ideally it should be as the implementation it mocks is
Antony Stubbs created KAFKA-10382: - Summary: MockProducer is not ThreadSafe, ideally it should be as the implementation it mocks is Key: KAFKA-10382 URL: https://issues.apache.org/jira/browse/KAFKA-10382 Project: Kafka Issue Type: Bug Components: clients Affects Versions: 2.6.0 Reporter: Antony Stubbs In testing my project, I discovered that the MockProducer is not thread safe as I thought. It doesn't use thread safe libraries for it's underlying stores, and only _some_ of it’s methods are synchronised. As performance isn’t an issue for this, I would propose simply synchronising all public methods in the class, as some already are. In my project, send is synchronised and commit transactions isn’t. This was causing weird collection manipulation and messages going missing. My lolcat only solution was simply to synchronise on the MockProducer instance before calling commit. See my workaround: https://github.com/astubbs/async-consumer/pull/13/files#diff-8e93aa2a2003be7436f94956cf809b2eR558 PR available: https://github.com/apache/kafka/pull/9154 -- This message was sent by Atlassian Jira (v8.3.4#803005)
[jira] [Created] (KAFKA-8884) Improve streams errors on class cast exception in ProcessorsNodes
Antony Stubbs created KAFKA-8884: Summary: Improve streams errors on class cast exception in ProcessorsNodes Key: KAFKA-8884 URL: https://issues.apache.org/jira/browse/KAFKA-8884 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.3.1 Reporter: Antony Stubbs If a processor causes a class cast exception, atm you get a bit of a cryptic error if you're not used to them, and without a context sensitive suggestion as to what could be wrong. Often these can be cause by missconfigured Serdes (defaults). As an example of the improvement over the case exception: {{org.apache.kafka.streams.errors.StreamsException: Exception caught in process. taskId=0_0, processor=KSTREAM-SOURCE-00, topic=streams-plaintext-input, partition=0, offset=0, stacktrace=org.apache.kafka.streams.errors.StreamsException: A deserializer (key: org.apache.kafka.common.serialization.ByteArrayDeserializer / value: org.apache.kafka.common.serialization.ByteArrayDeserializer) is not compatible to the actual key or value type (key type: [B / value type: [B). Change the default Serdes in StreamConfig or provide correct Serdes via method parameters.}} {{ at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:96)}} {{ at org.apache.kafka.streams.processor.internals.StreamTask.process(StreamTask.java:366)}} {{ at org.apache.kafka.streams.TopologyTestDriver.pipeInput(TopologyTestDriver.java:419)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorNodeTest.testTopologyLevelClassCastException(ProcessorNodeTest.java:176)}} {{ at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)}} {{ at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)}} {{ at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)}} {{ at java.base/java.lang.reflect.Method.invoke(Method.java:566)}} {{ at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)}} {{ at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)}} {{ at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)}} {{ at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)}} {{ at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)}} {{ at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)}} {{ at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:365)}} {{ at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)}} {{ at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)}} {{ at org.junit.runners.ParentRunner$4.run(ParentRunner.java:330)}} {{ at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:78)}} {{ at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:328)}} {{ at org.junit.runners.ParentRunner.access$100(ParentRunner.java:65)}} {{ at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:292)}} {{ at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:305)}} {{ at org.junit.runners.ParentRunner.run(ParentRunner.java:412)}} {{ at org.junit.runner.JUnitCore.run(JUnitCore.java:137)}} {{ at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)}} {{ at com.intellij.rt.execution.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:47)}} {{ at com.intellij.rt.execution.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:242)}} {{ at com.intellij.rt.execution.junit.JUnitStarter.main(JUnitStarter.java:70)}} {{Caused by: java.lang.ClassCastException: class [B cannot be cast to class java.lang.String ([B and java.lang.String are in module java.base of loader 'bootstrap')}} {{ at org.apache.kafka.streams.kstream.internals.AbstractStream.lambda$withKey$1(AbstractStream.java:103)}} {{ at org.apache.kafka.streams.kstream.internals.KStreamFlatMapValues$KStreamFlatMapValuesProcessor.process(KStreamFlatMapValues.java:40)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorNode.process(ProcessorNode.java:118)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:201)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:180)}} {{ at org.apache.kafka.streams.processor.internals.ProcessorContextImpl.forward(ProcessorContextImpl.java:133)}} {{ at org.apache.kafka.streams.processor.internals.SourceNode.process(SourceNode.java:89)}} {{ ... 28 more}} -- This message was sent by Atlassian Jira (v8.3.2#803003)
[jira] [Created] (KAFKA-8651) Add a #branch overload that takes a Map of predicate names to predicates
Antony Stubbs created KAFKA-8651: Summary: Add a #branch overload that takes a Map of predicate names to predicates Key: KAFKA-8651 URL: https://issues.apache.org/jira/browse/KAFKA-8651 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 2.3.0 Reporter: Antony Stubbs A map can be a more natural way to work with passing in predicates and extracting streams when working with the #branch method. KIP and PR on their way. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8648) Console tools should fail fast if an unrecognised option is passed in
Antony Stubbs created KAFKA-8648: Summary: Console tools should fail fast if an unrecognised option is passed in Key: KAFKA-8648 URL: https://issues.apache.org/jira/browse/KAFKA-8648 Project: Kafka Issue Type: Improvement Components: clients, consumer, producer Affects Versions: 2.3.0 Reporter: Antony Stubbs It's confusing atm when you pass in incorrect cli options, but the tool silently accepts anything you give it. IMO it should ideally fail fast and tell you which option you passed in was not recognised. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8647) Add consumer-props option to match producer-props for console consumer
Antony Stubbs created KAFKA-8647: Summary: Add consumer-props option to match producer-props for console consumer Key: KAFKA-8647 URL: https://issues.apache.org/jira/browse/KAFKA-8647 Project: Kafka Issue Type: Improvement Components: consumer, producer Affects Versions: 2.3.0 Reporter: Antony Stubbs Console consumer is missing a consumer-props option like the producer has. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-8572) Broker reports not leader partition as an error
Antony Stubbs created KAFKA-8572: Summary: Broker reports not leader partition as an error Key: KAFKA-8572 URL: https://issues.apache.org/jira/browse/KAFKA-8572 Project: Kafka Issue Type: Improvement Reporter: Antony Stubbs As this is an expected part of the broker protocol, is error an appropriate log level? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7731) JMX metrics for client connections: how many, what version, what language, source ip etc...
Antony Stubbs created KAFKA-7731: Summary: JMX metrics for client connections: how many, what version, what language, source ip etc... Key: KAFKA-7731 URL: https://issues.apache.org/jira/browse/KAFKA-7731 Project: Kafka Issue Type: New Feature Components: core Affects Versions: 2.1.0 Reporter: Antony Stubbs Extremely useful for diagnosing large installations with many clients, auditing client usage, behaviour etc.. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7725) Add a delay for further CG rebalances, beyond KIP-134 group.initial.rebalance.delay.ms
Antony Stubbs created KAFKA-7725: Summary: Add a delay for further CG rebalances, beyond KIP-134 group.initial.rebalance.delay.ms Key: KAFKA-7725 URL: https://issues.apache.org/jira/browse/KAFKA-7725 Project: Kafka Issue Type: New Feature Components: clients, consumer, core Affects Versions: 2.1.0 Reporter: Antony Stubbs KIP-134 group.initial.rebalance.delay.ms was a good start, but there are much bigger problems where after a system is up and running, consumers can leave and join in large amounts, causing rebalance storms. One example is Mesosphere deploying new versions of an app - say there are 10 instances, then 10 more instances are deployed with the new version, then the old 10 are scaled down. Ideally this would be 1 or 2 rebalances, instead of 20. The trade off is that if the delay is 5 seconds, every consumer joining within that window would extend it by another 5 seconds, potentially causing partitions to never be processed. To mitigate this, either a max rebalance delay could also be added, or multiple consumers joining won't extend the rebalance delay, so that it's always a max of 5 seconds. Related: [KIP-345: Introduce static membership protocol to reduce consumer rebalances|https://cwiki.apache.org/confluence/display/KAFKA/KIP-345%3A+Introduce+static+membership+protocol+to+reduce+consumer+rebalances] KAFKA-7018: persist memberId for consumer restart -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-7724) Docs for KIP-71 compact AND delete need correcting
Antony Stubbs created KAFKA-7724: Summary: Docs for KIP-71 compact AND delete need correcting Key: KAFKA-7724 URL: https://issues.apache.org/jira/browse/KAFKA-7724 Project: Kafka Issue Type: Bug Components: core, documentation Affects Versions: 2.1.0 Reporter: Antony Stubbs KAFKA-4015 Change cleanup.policy config to accept a list of valid policies / KIP-71 - the apache Kafka docs clearly say compact OR delete - (“A string that is either “delete” or “compact”.“). Seems the docs need updating to clearly say that combined policies are supported, and how they work. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6943) Have option to shutdown KS cleanly if any task crashes, or if all tasks crash
Antony Stubbs created KAFKA-6943: Summary: Have option to shutdown KS cleanly if any task crashes, or if all tasks crash Key: KAFKA-6943 URL: https://issues.apache.org/jira/browse/KAFKA-6943 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 1.1.0 Reporter: Antony Stubbs ATM users have to implement this themselves. Might be nice to have an option to configure that if all tasks crash, or if any crash, to initiate clean shutdown. This also has a gotcha where atm if you call KS#close without a timeout, from the uncaught exception handler, you dead lock. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6942) Connect connectors api doesn't show versions of connectors
Antony Stubbs created KAFKA-6942: Summary: Connect connectors api doesn't show versions of connectors Key: KAFKA-6942 URL: https://issues.apache.org/jira/browse/KAFKA-6942 Project: Kafka Issue Type: New Feature Components: KafkaConnect Affects Versions: 1.1.0 Reporter: Antony Stubbs Would be very useful to have the connector list API response also return the version of the installed connectors. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6910) Ability to specify a default state store type or factory
Antony Stubbs created KAFKA-6910: Summary: Ability to specify a default state store type or factory Key: KAFKA-6910 URL: https://issues.apache.org/jira/browse/KAFKA-6910 Project: Kafka Issue Type: New Feature Components: streams Affects Versions: 1.1.0, 1.1.1 Reporter: Antony Stubbs For large projects, it's a huge pain and not really practically at all to use a custom state store everywhere just to use in memory or avoid rocksdb, for example for running a test suite on windows. It would be great to be able to set a global config for KS so that it uses a different state store implementation everywhere. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6646) Add a GlobalKStream object type for stream event broadcast
Antony Stubbs created KAFKA-6646: Summary: Add a GlobalKStream object type for stream event broadcast Key: KAFKA-6646 URL: https://issues.apache.org/jira/browse/KAFKA-6646 Project: Kafka Issue Type: New Feature Components: streams Affects Versions: 1.1.0 Reporter: Antony Stubbs There are some use cases where having a global KStream object is useful. For example, where a single event is sent, with very low frequency, to a cluster of Kafka stream nodes to trigger all nodes to do some processing of state stored on their instance. Workaround currently is to either create a second kstream app instance, being careful to configure it with a different state dir, and give it a unique app name per instance, then create a kstream in each one. Or - you can use the normal consumer client inside your kstream app with unique consumer groups. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6543) Allow KTables to be bootstrapped at start up, like GKTables
Antony Stubbs created KAFKA-6543: Summary: Allow KTables to be bootstrapped at start up, like GKTables Key: KAFKA-6543 URL: https://issues.apache.org/jira/browse/KAFKA-6543 Project: Kafka Issue Type: Improvement Reporter: Antony Stubbs In some uses cases, it's desirable to have KTables "fully" bootstrapped (at leas in best efforts) before the topology begins, similar to how a GKTable does. This could prevent join race conditions for one, which could be a big problem if local KTable state has been lost. -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6542) Tables should trigger joins too, not just streams
Antony Stubbs created KAFKA-6542: Summary: Tables should trigger joins too, not just streams Key: KAFKA-6542 URL: https://issues.apache.org/jira/browse/KAFKA-6542 Project: Kafka Issue Type: Improvement Components: streams Affects Versions: 1.1.0 Reporter: Antony Stubbs At the moment it's quite possible to have a race condition when joining a stream with a table, if the stream event arrives first, before the table event, in which case the join will fail. This is also related to bootstrapping KTables (which is what a GKTable does). -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6519) Change log level from ERROR to WARN for not leader for this partition exception
Antony Stubbs created KAFKA-6519: Summary: Change log level from ERROR to WARN for not leader for this partition exception Key: KAFKA-6519 URL: https://issues.apache.org/jira/browse/KAFKA-6519 Project: Kafka Issue Type: Improvement Components: core Affects Versions: 1.0.0 Reporter: Antony Stubbs Not the leader for this partition is not an error in operation and is in fact expected and a apart of the partition discovery / movement system. This confuses users because they think something is going wrong. I'd suggest at least changing it to WARN, but perhaps is it even something users should be warned about? -- This message was sent by Atlassian JIRA (v7.6.3#76005)
[jira] [Created] (KAFKA-6369) General wildcard support for ACL's in kafka
Antony Stubbs created KAFKA-6369: Summary: General wildcard support for ACL's in kafka Key: KAFKA-6369 URL: https://issues.apache.org/jira/browse/KAFKA-6369 Project: Kafka Issue Type: New Feature Reporter: Antony Stubbs Especially for streams apps where all intermediate topics are prefixed with the application id. For example, add read and write access to mystreamsapp.* so any new topics created by the app don't need to have specific permissions applied to them. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6270) ProducerPerformance doesn't support setting sasl.jaas.config as a --producer-props
Antony Stubbs created KAFKA-6270: Summary: ProducerPerformance doesn't support setting sasl.jaas.config as a --producer-props Key: KAFKA-6270 URL: https://issues.apache.org/jira/browse/KAFKA-6270 Project: Kafka Issue Type: Bug Components: tools Affects Versions: 0.11.0.1 Reporter: Antony Stubbs I seems the parser expects everything to strictly be key=value pairs, where as sasl.jaas.config is key="preamble key=value key=value" {{Exception in thread "main" java.lang.IllegalArgumentException: Invalid property: sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required username='user' password='user-secret'"; at org.apache.kafka.tools.ProducerPerformance.main(ProducerPerformance.java:99)}} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6268) Tools should now swallow exceptions like resolving network names
Antony Stubbs created KAFKA-6268: Summary: Tools should now swallow exceptions like resolving network names Key: KAFKA-6268 URL: https://issues.apache.org/jira/browse/KAFKA-6268 Project: Kafka Issue Type: Bug Components: core Affects Versions: 0.11.0.1 Reporter: Antony Stubbs The cli consumer client shows nothing when it can't resolve a domain. This and other errors like it should be shown to the user by default. You have to turn on DEBUG level logging in the tools log4j to find there is an error. {{[2017-11-23 16:40:56,401] DEBUG Error connecting to node as-broker-1-eu-west-1b-public:9092 (id: 1 rack: null) (org.apache.kafka.clients.NetworkClient) java.io.IOException: Can't resolve address: as-broker-1-eu-west-1b-public:9092 at org.apache.kafka.common.network.Selector.connect(Selector.java:195) at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:764) at org.apache.kafka.clients.NetworkClient.access$600(NetworkClient.java:60) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:908) at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:819) at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:431) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:208) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:199) at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:134) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:223) at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:200) at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:286) at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1078) at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1043) at kafka.consumer.NewShinyConsumer.(BaseConsumer.scala:64) at kafka.tools.ConsoleConsumer$.run(ConsoleConsumer.scala:72) at kafka.tools.ConsoleConsumer$.main(ConsoleConsumer.scala:53) at kafka.tools.ConsoleConsumer.main(ConsoleConsumer.scala) Caused by: java.nio.channels.UnresolvedAddressException at sun.nio.ch.Net.checkAddress(Net.java:101) at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:622) at org.apache.kafka.common.network.Selector.connect(Selector.java:192) ... 18 more }} -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6265) GlobalKTable missing #queryableStoreName()
Antony Stubbs created KAFKA-6265: Summary: GlobalKTable missing #queryableStoreName() Key: KAFKA-6265 URL: https://issues.apache.org/jira/browse/KAFKA-6265 Project: Kafka Issue Type: Bug Components: streams Affects Versions: 1.0.0 Reporter: Antony Stubbs KTable has the nicely useful #queryableStoreName(), it seems to be missing from GlobalKTable -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6235) Kafka should have an emergency retention setting for max disk used
Antony Stubbs created KAFKA-6235: Summary: Kafka should have an emergency retention setting for max disk used Key: KAFKA-6235 URL: https://issues.apache.org/jira/browse/KAFKA-6235 Project: Kafka Issue Type: New Feature Reporter: Antony Stubbs Kafka should have an emergency retention setting for max disk used to prevent the broker running out of disk and partitions going off line. When this max is reached, Kafka could perhaps delete segments from the largest topics.. Would have to be used with care as current behaviour is to preserve data at the cost of availability. This would favour availability over data retention. At the moment it's quite hard to reason about disk usage and Kafka as the max byte settings are all per partition, and the math can get complicated when you have lots of topics of different use cases and sizes.. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6226) Performance Consumer should print units in it's output, like the producer
Antony Stubbs created KAFKA-6226: Summary: Performance Consumer should print units in it's output, like the producer Key: KAFKA-6226 URL: https://issues.apache.org/jira/browse/KAFKA-6226 Project: Kafka Issue Type: Bug Affects Versions: 1.1.0 Reporter: Antony Stubbs IMO this should be the default behaviour which would match the performance producer, and be able to disable it with a config. https://github.com/apache/kafka/pull/4080 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6225) Add an option to performance consumer consume continuously
Antony Stubbs created KAFKA-6225: Summary: Add an option to performance consumer consume continuously Key: KAFKA-6225 URL: https://issues.apache.org/jira/browse/KAFKA-6225 Project: Kafka Issue Type: Bug Components: consumer Affects Versions: 1.1.0 Reporter: Antony Stubbs Priority: Minor IMO this should be the default behaviour which would match the performance producer, and be able to disable it with a config. I can implement this either by adding an infinite loop, or by allowing the user to configure the timeout setting which is currently hard coded to 1 second. Patches are available for either. https://github.com/apache/kafka/pull/4082 -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6182) Automatic co-partitioning of topics via automatic intermediate topic with matching partitions
Antony Stubbs created KAFKA-6182: Summary: Automatic co-partitioning of topics via automatic intermediate topic with matching partitions Key: KAFKA-6182 URL: https://issues.apache.org/jira/browse/KAFKA-6182 Project: Kafka Issue Type: New Feature Components: streams Reporter: Antony Stubbs Currently it is up to the user to ensure that two input topics for a join have the same number of partitions. It would be great have Kafka streams detect this automatically, or at least give the user and easy way, and create an intermediate topic with the same number of partitions as the topic being joins with. See https://docs.confluent.io/current/streams/developer-guide.html#joins-require-co-partitioning-of-the-input-data -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6145) Warm up new KS instances before migrating tasks - potentially a two phase rebalance
Antony Stubbs created KAFKA-6145: Summary: Warm up new KS instances before migrating tasks - potentially a two phase rebalance Key: KAFKA-6145 URL: https://issues.apache.org/jira/browse/KAFKA-6145 Project: Kafka Issue Type: New Feature Components: streams Reporter: Antony Stubbs Currently when expanding the KS cluster, the new node's partitions will be unavailable during the rebalance, which for large states can take a very long time, or for small state stores even more than a few ms can be a deal breaker for micro service use cases. One workaround would be two execute the rebalance in two phases: 1) start running state store building on the new node 2) once the state store is fully populated on the new node, only then rebalance the tasks - there will still be a rebalance pause, but would be greatly reduced -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6144) Allow state stores to serve stale reads during rebalance
Antony Stubbs created KAFKA-6144: Summary: Allow state stores to serve stale reads during rebalance Key: KAFKA-6144 URL: https://issues.apache.org/jira/browse/KAFKA-6144 Project: Kafka Issue Type: New Feature Components: streams Reporter: Antony Stubbs Currently when expanding the KS cluster, the new node's partitions will be unavailable during the rebalance, which for large states can take a very long time, or for small state stores even more than a few ms can be a deal breaker for micro service use cases. One workaround is to allow stale data to be read from the state stores when use case allows. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6080) Transactional EoS for source connectors
Antony Stubbs created KAFKA-6080: Summary: Transactional EoS for source connectors Key: KAFKA-6080 URL: https://issues.apache.org/jira/browse/KAFKA-6080 Project: Kafka Issue Type: New Feature Components: KafkaConnect Reporter: Antony Stubbs Exactly once (eos) message production for source connectors. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Created] (KAFKA-6079) Idempotent production for source connectors
Antony Stubbs created KAFKA-6079: Summary: Idempotent production for source connectors Key: KAFKA-6079 URL: https://issues.apache.org/jira/browse/KAFKA-6079 Project: Kafka Issue Type: New Feature Components: KafkaConnect Reporter: Antony Stubbs Idempotent production for source connection to reduce duplicates at least from retires. -- This message was sent by Atlassian JIRA (v6.4.14#64029)
[jira] [Issue Comment Deleted] (KAFKA-5178) Potential Performance Degradation in Kafka Producer when using Multiple Threads
[ https://issues.apache.org/jira/browse/KAFKA-5178?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antony Stubbs updated KAFKA-5178: - Comment: was deleted (was: My raw notes on researching the issue and slack conversations: Consumer Performance Notes https://confluent.slack.com/archives/C0KRA68SZ/p1492451673007148 Chris Matta [6:54 PM] what happens if a consumer is consuming from multiple partitions with differing cardinality, and they’ve set `max.poll.records` to 500, the first partition always has 500 records to pull, does that mean the other partitions will never be read? Ewen Cheslack-Postava [6:59 PM] @chris it will return the others. the completed fetches are tracked per-partition in a queue [7:00] and that queue is processed in order so that when a FetchRequest returns with data for multiple partitions, all that data gets enqueued then a subsequent FetchRequest is sent. any subsequent data from the same partitions will get queued up after the data from the first request/response Chris Matta [7:03 PM] ok, thanks @ewen, is this covered in documentation anywhere? Ewen Cheslack-Postava [9:03 PM] not that i'm aware of. not even sure how we'd do that very concisely since it requires a bit of understanding of the underlying protocol to not overstate the guarantees, i.e. you need to know a bit about fetch requests Ismael Juma [10:44 PM] @chris KIP-41 describes the behaviour of `max.poll.records`. KIP-74 is also somewhat relevant if you’re thinking about the behaviour of fetches as well. Not user docs though. KIP-41 describes the behaviour of `max.poll.records` https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records KIP-74 is also somewhat relevant if you’re thinking about the behaviour of fetches as well https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes https://confluent.slack.com/archives/C07FCMZ39/p1491263732459808 https://confluent.slack.com/archives/C07FCMZ39/p1491597142583282 https://issues.apache.org/jira/browse/KAFKA-4753 https://confluent.slack.com/archives/C07FCMZ39/p1492199660178716 Replicator issues with funding circle - hot fixes were shipped to them http://testing.confluent.io/confluent-kafka-system-test-results/?prefix=2017-04-28--001.1493388356--apache--trunk--bc10f5f/Benchmark/ https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/ https://issues.apache.org/jira/browse/KAFKA-4753 KafkaConsumer susceptible to FetchResponse starvation) > Potential Performance Degradation in Kafka Producer when using Multiple > Threads > --- > > Key: KAFKA-5178 > URL: https://issues.apache.org/jira/browse/KAFKA-5178 > Project: Kafka > Issue Type: Bug >Reporter: Ben Stopford > Attachments: TestDetails.pdf > > > There is evidence that the Kafka Producer drops performance as we increase > the number of threads using it. > This is based on some benchmarking done in the community. I have not > independently validated these results. Details of the test setup attached. > ... > *Effect of Shared KafkaProducer across threads* > > Kafka documentation recommend using the KafkaProducer across multiple worker > threads. > > ||#Producers||#Consumers||#Topics||#Partitions per topic||RoundTrip > Throughput (events/sec)||Approx Broker Events (Millions/sec)|| > |1|1|1|1|268,312|0.5| > |4|4|4|4|759,186|1.5| > |8|8|8|8|640,738|1.2| > |8|8|8|16|847,314|1.7| > |8|8|8|48|17,791|0.035| > |16|16|16|64|5,997|0.01| > > Something appears to be wrong here, with 48 and 64 partitions the shared > KafkaProducer struggled to the point that performance became quite bad. > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Commented] (KAFKA-5178) Potential Performance Degradation in Kafka Producer when using Multiple Threads
[ https://issues.apache.org/jira/browse/KAFKA-5178?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel=15998280#comment-15998280 ] Antony Stubbs commented on KAFKA-5178: -- My raw notes on researching the issue and slack conversations: Consumer Performance Notes https://confluent.slack.com/archives/C0KRA68SZ/p1492451673007148 Chris Matta [6:54 PM] what happens if a consumer is consuming from multiple partitions with differing cardinality, and they’ve set `max.poll.records` to 500, the first partition always has 500 records to pull, does that mean the other partitions will never be read? Ewen Cheslack-Postava [6:59 PM] @chris it will return the others. the completed fetches are tracked per-partition in a queue [7:00] and that queue is processed in order so that when a FetchRequest returns with data for multiple partitions, all that data gets enqueued then a subsequent FetchRequest is sent. any subsequent data from the same partitions will get queued up after the data from the first request/response Chris Matta [7:03 PM] ok, thanks @ewen, is this covered in documentation anywhere? Ewen Cheslack-Postava [9:03 PM] not that i'm aware of. not even sure how we'd do that very concisely since it requires a bit of understanding of the underlying protocol to not overstate the guarantees, i.e. you need to know a bit about fetch requests Ismael Juma [10:44 PM] @chris KIP-41 describes the behaviour of `max.poll.records`. KIP-74 is also somewhat relevant if you’re thinking about the behaviour of fetches as well. Not user docs though. KIP-41 describes the behaviour of `max.poll.records` https://cwiki.apache.org/confluence/display/KAFKA/KIP-41%3A+KafkaConsumer+Max+Records KIP-74 is also somewhat relevant if you’re thinking about the behaviour of fetches as well https://cwiki.apache.org/confluence/display/KAFKA/KIP-74%3A+Add+Fetch+Response+Size+Limit+in+Bytes https://confluent.slack.com/archives/C07FCMZ39/p1491263732459808 https://confluent.slack.com/archives/C07FCMZ39/p1491597142583282 https://issues.apache.org/jira/browse/KAFKA-4753 https://confluent.slack.com/archives/C07FCMZ39/p1492199660178716 Replicator issues with funding circle - hot fixes were shipped to them http://testing.confluent.io/confluent-kafka-system-test-results/?prefix=2017-04-28--001.1493388356--apache--trunk--bc10f5f/Benchmark/ https://www.confluent.io/blog/how-to-choose-the-number-of-topicspartitions-in-a-kafka-cluster/ https://issues.apache.org/jira/browse/KAFKA-4753 KafkaConsumer susceptible to FetchResponse starvation > Potential Performance Degradation in Kafka Producer when using Multiple > Threads > --- > > Key: KAFKA-5178 > URL: https://issues.apache.org/jira/browse/KAFKA-5178 > Project: Kafka > Issue Type: Bug >Reporter: Ben Stopford > Attachments: TestDetails.pdf > > > There is evidence that the Kafka Producer drops performance as we increase > the number of threads using it. > This is based on some benchmarking done in the community. I have not > independently validated these results. Details of the test setup attached. > ... > *Effect of Shared KafkaProducer across threads* > > Kafka documentation recommend using the KafkaProducer across multiple worker > threads. > > ||#Producers||#Consumers||#Topics||#Partitions per topic||RoundTrip > Throughput (events/sec)||Approx Broker Events (Millions/sec)|| > |1|1|1|1|268,312|0.5| > |4|4|4|4|759,186|1.5| > |8|8|8|8|640,738|1.2| > |8|8|8|16|847,314|1.7| > |8|8|8|48|17,791|0.035| > |16|16|16|64|5,997|0.01| > > Something appears to be wrong here, with 48 and 64 partitions the shared > KafkaProducer struggled to the point that performance became quite bad. > -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Updated] (KAFKA-4783) Blackbox or pass through converter or ByteArrayConverter for connect
[ https://issues.apache.org/jira/browse/KAFKA-4783?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ] Antony Stubbs updated KAFKA-4783: - Description: Connect is missing a pass through converter / ByteArrayConverter that doesn't manipulate the message payload. This is needed for binary messages that don't have a plaintext, json or avro interpretation. For example, messages that contain a binary encoding of a proprietary or esoteric protocol, to be decoded later. Currently there's a public class available here: https://github.com/qubole/streamx/blob/8b9d43008d9901b8e6020404f137944ed97522e2/src/main/java/com/qubole/streamx/ByteArrayConverter.java was:Connect is missing a pass through converter / ByteArrayConverter that doesn't manipulate the message payload. This is needed for binary messages that don't have a plaintext, json or avro interpretation. For example, messages that contain a binary encoding of a proprietary or esoteric protocol, to be decoded later. > Blackbox or pass through converter or ByteArrayConverter for connect > > > Key: KAFKA-4783 > URL: https://issues.apache.org/jira/browse/KAFKA-4783 > Project: Kafka > Issue Type: Bug > Components: KafkaConnect >Affects Versions: 0.10.2.0 >Reporter: Antony Stubbs > > Connect is missing a pass through converter / ByteArrayConverter that doesn't > manipulate the message payload. This is needed for binary messages that don't > have a plaintext, json or avro interpretation. For example, messages that > contain a binary encoding of a proprietary or esoteric protocol, to be > decoded later. > Currently there's a public class available here: > https://github.com/qubole/streamx/blob/8b9d43008d9901b8e6020404f137944ed97522e2/src/main/java/com/qubole/streamx/ByteArrayConverter.java -- This message was sent by Atlassian JIRA (v6.3.15#6346)
[jira] [Created] (KAFKA-4783) Blackbox or pass through converter or ByteArrayConverter for connect
Antony Stubbs created KAFKA-4783: Summary: Blackbox or pass through converter or ByteArrayConverter for connect Key: KAFKA-4783 URL: https://issues.apache.org/jira/browse/KAFKA-4783 Project: Kafka Issue Type: Bug Components: KafkaConnect Affects Versions: 0.10.2.0 Reporter: Antony Stubbs Connect is missing a pass through converter / ByteArrayConverter that doesn't manipulate the message payload. This is needed for binary messages that don't have a plaintext, json or avro interpretation. For example, messages that contain a binary encoding of a proprietary or esoteric protocol, to be decoded later. -- This message was sent by Atlassian JIRA (v6.3.15#6346)