[jira] [Assigned] (KAFKA-13985) MirrorSourceTask commitRecord throws NPE if SMT is filtering out source record

2022-09-07 Thread Rens Groothuijsen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-13985?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rens Groothuijsen reassigned KAFKA-13985:
-

Assignee: Rens Groothuijsen

> MirrorSourceTask commitRecord throws NPE if SMT is filtering out source record
> --
>
> Key: KAFKA-13985
> URL: https://issues.apache.org/jira/browse/KAFKA-13985
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.1.0, 3.2.0
>Reporter: Jacopo Riciputi
>Assignee: Rens Groothuijsen
>Priority: Minor
>
> Applying a SMT that filters out messages it can brings to enter in this path:
> From WorkerSourceTask.java
> {code:java}
> final SourceRecord record = transformationChain.apply(preTransformRecord);
> final ProducerRecord producerRecord = 
> convertTransformedRecord(record);
> if (producerRecord == null || retryWithToleranceOperator.failed()) {
>     counter.skipRecord();
>     commitTaskRecord(preTransformRecord, null);
>     continue;
> } {code}
>  
> Then to:
> {code:java}
> private void commitTaskRecord(SourceRecord record, RecordMetadata metadata) {
>         try {
>             task.commitRecord(record, metadata);
>         } catch (Throwable t) {
>             log.error("{} Exception thrown while calling 
> task.commitRecord()", this, t);
>         }
> }{code}
> Finally
> From MirrorSourceTask.java
> {code:java}
>     @Override
>     public void commitRecord(SourceRecord record, RecordMetadata metadata) {
>         try {
>             if (stopping) {
>                 return;
>             }
>             if (!metadata.hasOffset()) {
>                 log.error("RecordMetadata has no offset -- can't sync offsets 
> for {}.", record.topic());
>                 return;
>             }
> ...{code}
>  
> Causing a NPE because metadata is null. 
> This the exception.
> {code:java}
> [2022-06-13 12:31:33,094] WARN Failure committing record. 
> (org.apache.kafka.connect.mirror.MirrorSourceTask:190)
> java.lang.NullPointerException
>     at 
> org.apache.kafka.connect.mirror.MirrorSourceTask.commitRecord(MirrorSourceTask.java:177)
>     at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.commitTaskRecord(WorkerSourceTask.java:463)
>     at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:358)
>     at 
> org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:257)
>     at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188)
>     at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:243)
>     at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Unknown 
> Source)
>     at java.base/java.util.concurrent.FutureTask.run(Unknown Source)
>     at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown 
> Source)
>     at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown 
> Source)
>     at java.base/java.lang.Thread.run(Unknown Source) {code}
> In my understanding this is well handled and it does not have negative 
> impacts because it's handled by MirrorSourceTask.commitRecord, without 
> leaving the exception be forwarded outside of it. 
> But probably is preferred to handle it checking if metadata != null.
> So skipping commit but safely and silently
> [EDIT]
> Actually, going a bit in deep, there is a small side-effect.
> If the latest message elaborated was filtered out (so not committed by 
> MirrorSourceTask), if MM2 instance is rebooted, this message will be re-read 
> by consumer, because offset was not committed (and probably filtered out if 
> configurations wasn't change).
> But probably this behavior is fine considering MM2's nature
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-14081) Cannot get my MetricsReporter implementation to receive meaningful metrics

2022-08-09 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-14081?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17577677#comment-17577677
 ] 

Rens Groothuijsen commented on KAFKA-14081:
---

I had a look at the source code, and it does appear that {{metricChange()}} is 
only being called when adding a metric, not when a metric changes. Given that 
the documentation of {{metricChange()}} says otherwise, I assume this was 
accidentally removed at some point.

> Cannot get my MetricsReporter implementation to receive meaningful metrics
> --
>
> Key: KAFKA-14081
> URL: https://issues.apache.org/jira/browse/KAFKA-14081
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 3.3.0
>Reporter: Gian Luca
>Priority: Minor
>
> I want to extract metrics from KafkaProducer to export them to our company 
> monitoring solution. At first I went for implementing {{MetricsReporter}} and 
> registering my implementation through the "metric.reporters" config property. 
> The class is correctly registered as it receives metric updates through 
> {{metricChange()}} while KafkaProducer is being used. The problem is that all 
> the metric values are stuck at zero (NaN in older versions of Kafka), even 
> the most trivial (e.g. 'record-send-total').
> If instead of using a report I simply poll the {{metrics()}} method of the 
> KafkaProducer, then I see meaningful values: counters increasing over time, 
> etc.



--
This message was sent by Atlassian Jira
(v8.20.10#820010)


[jira] [Commented] (KAFKA-13796) MM2 - Topics Exclude/Blacklist not working

2022-07-24 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-13796?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17570518#comment-17570518
 ] 

Rens Groothuijsen commented on KAFKA-13796:
---

>From what I can see, adding quote marks around the topic name makes a 
>difference for topics.exclude. 'topics.exclude=topicname' produced the 
>expected result, whereas 'topics.exclude="topicname"' did not.

> MM2 - Topics Exclude/Blacklist not working
> --
>
> Key: KAFKA-13796
> URL: https://issues.apache.org/jira/browse/KAFKA-13796
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 3.0.0
>Reporter: David Bros
>Priority: Blocker
> Fix For: 3.0.0
>
>
> Hi all,
> We are testing Kafka 3.0.0 in a development environment with Java 17 in a 
> Centos 7 machine.
> When setting up MM2 replication between two clusters we exclude some topics 
> we don't want MM2 to replicate via topics.exclude (we have also tried 
> topics.blacklist)
> Even though the configuration is parsed correctly, we still get those topics 
> replicated among clusters.
> Here is our config for the us_central cluster:
> ```
>     # Clusters and bootstrap servers
>     clusters=eu_west, us_central
>     eu_west.bootstrap.servers=XXnet:9092,XX:9092
>     us_central.bootstrap.servers=XX:9092
>     us_central.consumer.auto.offset.reset=latest
>     us_central.consumer.fetch.max.bytes=31457280
>     us_central.consumer.fetch.max.wait.ms=1
>     us_central.consumer.max.poll.records=3000
>     us_central.consumer.request.timeout.ms=6
>     eu_west.consumer.auto.offset.reset=latest
>     eu_west.consumer.fetch.max.bytes=52428800
>     eu_west.consumer.fetch.max.wait.ms=1
>     eu_west.consumer.max.poll.records=3000
>     eu_west.consumer.request.timeout.ms=6
>     # Custom producer settings
>     us_central.producer.max.request.size=27262976
>     us_central.producer.batch.size=22000 
>     us_central.producer.compression.type=none
>     us_central.producer.send.buffer.bytes=26214400
>     us_central.producer.receive.buffer.bytes=26214400
>     eu_west.producer.max.request.size=27262976
>     eu_west.producer.batch.size=22000
>     eu_west.producer.compression.type=none
>     eu_west.producer.send.buffer.bytes=26214400
>     eu_west.producer.receive.buffer.bytes=26214400
>     # Topics configuration
>     topics=.*
>     topics.blacklist="mm2_test_0"
>     # Tasks (threads)
>     tasks.max=10
>     # Groups
>     groups=phx_netflow
>     # Replication factors, these are 1 for lab
>     replication.factor=1
>     config.storage.replication.factor=1
>     offset.storage.replication.factor=1
>     status.storage.replication.factor=1
>     checkpoints.topic.replication.factor=1
>     offset-syncs.topic.replication.factor=1
>     sync.group.offsets.replication.factor=1
>     ## Config
>     config.properties.exclude=local.retention.ms
>     # Refresh rates
>     us_central.refresh.topics.interval.seconds=15
>     us_central.refresh.groups.interval.seconds=15
>     eu_west.refresh.topics.interval.seconds=15
>     eu_west.refresh.groups.interval.seconds=15
>     ## Sync options
>     sync.topic.acls.enabled=true
>     offset-syncs.topic.replication.enabled=true
>     sync.group.offsets.enabled=true
>     # EU pulls us, syd, nyc
>     us_central->eu_west.enabled=true
>     us_central->eu_west.sync.group.offsets.enabled=false
>     # US pulls eu, syd, nyc
>     eu_west->us_central.enabled=true
>     eu_west->us_central.sync.group.offsets.enabled=true
> ```
> Here is the configuration for eu_west cluster:
> ```
>     # Clusters and bootstrap servers
>     clusters=eu_west, us_central
>     eu_west.bootstrap.servers=XX,XX:9092
>     us_central.bootstrap.servers=XX:9092
>     us_central.consumer.auto.offset.reset=latest 
>     us_central.consumer.fetch.max.bytes=31457280
>     us_central.consumer.fetch.max.wait.ms=1
>     us_central.consumer.max.poll.records=3000
>     us_central.consumer.request.timeout.ms=6
>     eu_west.consumer.auto.offset.reset=latest
>     eu_west.consumer.fetch.max.bytes=52428800
>     eu_west.consumer.fetch.max.wait.ms=1
>     eu_west.consumer.max.poll.records=3000
>     eu_west.consumer.request.timeout.ms=6
>     # Custom producer settings
>     us_central.producer.max.request.size=27262976
>     us_central.producer.batch.size=22000
>     us_central.producer.compression.type=none
>     us_central.producer.send.buffer.bytes=26214400
>     us_central.producer.receive.buffer.bytes=26214400
>     eu_west.producer.max.request.size=27262976
>     eu_west.producer.batch.size=22000
>     eu_west.producer.compression.type=none
>     eu_west.producer.send.buffer.bytes=26214400
>     eu_west.producer.receive.buffer.bytes=26214400
>     # Topics configuration
>     topics=.*
>     topics.b

[jira] [Commented] (KAFKA-5666) Need feedback to user if consumption fails due to offsets.topic.replication.factor=3

2021-08-25 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-5666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17404267#comment-17404267
 ] 

Rens Groothuijsen commented on KAFKA-5666:
--

[~ll1124278064] No, I'm no longer working on this one, so go right ahead.

> Need feedback to user if consumption fails due to 
> offsets.topic.replication.factor=3
> 
>
> Key: KAFKA-5666
> URL: https://issues.apache.org/jira/browse/KAFKA-5666
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 0.11.0.0
>Reporter: Yeva Byzek
>Priority: Major
>  Labels: newbie, usability
>
> Introduced in 0.11: The offsets.topic.replication.factor broker config is now 
> enforced upon auto topic creation. Internal auto topic creation will fail 
> with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets 
> this replication factor requirement.
> Issue: Default is setting offsets.topic.replication.factor=3, but in 
> development and docker environments where there is only 1 broker, the offsets 
> topic will fail to be created when a consumer tries to consume and no records 
> will be returned.  As a result, the user experience is bad.  The user may 
> have no idea about this setting change and enforcement, and they just see 
> that `kafka-console-consumer` hangs with ZERO output. It is true that the 
> broker log file will provide a message (e.g. {{ERROR [KafkaApi-1] Number of 
> alive brokers '1' does not meet the required replication factor '3' for the 
> offsets topic (configured via 'offsets.topic.replication.factor'). This error 
> can be ignored if the cluster is starting up and not all brokers are up yet. 
> (kafka.server.KafkaApis)}}) but many users do not have access to the log 
> files or know how to get them.
> Suggestion: give feedback to the user/app if offsets topic cannot be created. 
>  For example, after some timeout.
> Workaround:
> Set offsets.topic.replication.factor=3



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10417) suppress() with cogroup() throws ClassCastException

2020-10-09 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17211318#comment-17211318
 ] 

Rens Groothuijsen commented on KAFKA-10417:
---

[~ableegoldman] I would, but I'm not familiar enough with the code to determine 
what the best solution would be.

> suppress() with cogroup() throws ClassCastException
> ---
>
> Key: KAFKA-10417
> URL: https://issues.apache.org/jira/browse/KAFKA-10417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Wardha Perinkada Kattu
>Priority: Critical
>  Labels: kafka-streams
> Fix For: 2.6.1, 2.8.0, 2.7.1
>
>
> Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` 
> throws `ClassCastException`
> Works fine without the `suppress()`
> Code block tested -
> {code:java}
> val stream1 = requestStreams.merge(successStreams).merge(errorStreams)
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.notificationSerde()))
> val streams2 = confirmationStreams
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.confirmationsSerde()))
> val cogrouped = 
> stream1.cogroup(notificationAggregator).cogroup(streams2, 
> confirmationsAggregator)
> 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong(
> .aggregate({ null }, Materialized.`as` NotificationMetric, WindowStore ByteArray>>("time-windowed-aggregated-stream-store")
> 
> .withValueSerde(serdesConfig.notificationMetricSerde()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()))
> .toStream()
> {code}
> Exception thrown is:
> {code:java}
> Caused by: java.lang.ClassCastException: class 
> org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to 
> class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier 
> (org.apache.kafka.streams.kstream.internals.PassThrough and 
> org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in 
> unnamed module of loader 'app')
> {code}
> [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10417) suppress() with cogroup() throws ClassCastException

2020-08-30 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10417?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17187322#comment-17187322
 ] 

Rens Groothuijsen commented on KAFKA-10417:
---

A cogrouped stream creates a KTable with processor supplier type PassThrough, 
which implements ProcessorSupplier rather than  KTableProcessorSupplier. This 
then causes a problem 
[here|https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/kstream/internals/KTableImpl.java#L842].

> suppress() with cogroup() throws ClassCastException
> ---
>
> Key: KAFKA-10417
> URL: https://issues.apache.org/jira/browse/KAFKA-10417
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.6.0
>Reporter: Wardha Perinkada Kattu
>Priority: Blocker
>  Labels: kafka-streams
> Fix For: 2.7.0
>
>
> Streams operation - `cogroup()`, `aggregate()` followed by `suppress()` 
> throws `ClassCastException`
> Works fine without the `suppress()`
> Code block tested -
> {code:java}
> val stream1 = requestStreams.merge(successStreams).merge(errorStreams)
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.notificationSerde()))
> val streams2 = confirmationStreams
> .groupByKey(Grouped.with(Serdes.String(), 
> serdesConfig.confirmationsSerde()))
> val cogrouped = 
> stream1.cogroup(notificationAggregator).cogroup(streams2, 
> confirmationsAggregator)
> 
> .windowedBy(TimeWindows.of(Duration.ofMinutes(notificationStreamsConfig.joinWindowMinutes.toLong())).grace(Duration.ofMinutes(notificationStreamsConfig.graceDurationMinutes.toLong(
> .aggregate({ null }, Materialized.`as` NotificationMetric, WindowStore ByteArray>>("time-windowed-aggregated-stream-store")
> 
> .withValueSerde(serdesConfig.notificationMetricSerde()))
>  .suppress(Suppressed.untilWindowCloses(unbounded()))
> .toStream()
> {code}
> Exception thrown is:
> {code:java}
> Caused by: java.lang.ClassCastException: class 
> org.apache.kafka.streams.kstream.internals.PassThrough cannot be cast to 
> class org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier 
> (org.apache.kafka.streams.kstream.internals.PassThrough and 
> org.apache.kafka.streams.kstream.internals.KTableProcessorSupplier are in 
> unnamed module of loader 'app')
> {code}
> [https://stackoverflow.com/questions/63459685/kgroupedstream-with-cogroup-aggregate-suppress]



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10363) Broker try to connect to a new cluster when there are changes in zookeeper.connect properties

2020-08-16 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10363?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17178634#comment-17178634
 ] 

Rens Groothuijsen commented on KAFKA-10363:
---

One possible workaround is to update the broker's existing 
kafka-logs/meta.properties file with the expected ID, or remove it so that it 
will be regenerated upon next restart.

> Broker try to connect to a new cluster when there are changes in 
> zookeeper.connect properties
> -
>
> Key: KAFKA-10363
> URL: https://issues.apache.org/jira/browse/KAFKA-10363
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.4.0, 2.3.1
> Environment: 3 Kafka brokers (v2.3.1, v2.4.0) with Zookeeper cluster 
> (3.4.10)
> Ubuntu 18.04 LTS
>Reporter: Alexey Kornev
>Priority: Critical
>
> We've just successfully set up a Kafka cluster consists of 3 brokers and 
> faced with the following issue: when we change order of zookeeper servers in 
> zookeeper.connect property in server.properties files and restart Kafka 
> broker then this Kafka broker tries to connect to a new Kafka cluster. As a 
> result, Kafka broker throws an error and shutdown. 
> For example, config server.properties on first broker:
> {code:java}
> broker.id=-1
> ...
> zookeeper.connect=node_1:2181/kafka,node_2:2181/kafka,node_3:2181/kafka
> {code}
>  We changed it to 
> {code:java}
> broker.id=-1
> ...
> zookeeper.connect=node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka {code}
> and restart Kafka broker. 
> Logs:
> {code:java}
> [2020-08-05 09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
> 09:07:55,658] INFO [ExpirationReaper-0-Heartbeat]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
> 09:07:55,658] INFO [ExpirationReaper-0-topic]: Starting 
> (kafka.server.DelayedOperationPurgatory$ExpiredOperationReaper)[2020-08-05 
> 09:07:57,070] INFO Registered kafka:type=kafka.Log4jController MBean 
> (kafka.utils.Log4jControllerRegistration$)[2020-08-05 09:07:57,656] INFO 
> Registered signal handlers for TERM, INT, HUP 
> (org.apache.kafka.common.utils.LoggingSignalHandler)[2020-08-05 09:07:57,657] 
> INFO starting (kafka.server.KafkaServer)[2020-08-05 09:07:57,658] INFO 
> Connecting to zookeeper on 
> node_2:2181/kafka,node_3:2181/kafka,node_1:2181/kafka 
> (kafka.server.KafkaServer)[2020-08-05 09:07:57,685] INFO [ZooKeeperClient 
> Kafka server] Initializing a new session to node_2:2181. 
> (kafka.zookeeper.ZooKeeperClient)[2020-08-05 09:07:57,690] INFO Client 
> environment:zookeeper.version=3.4.14-4c25d480e66aadd371de8bd2fd8da255ac140bcf,
>  built on 03/06/2019 16:18 GMT (org.apache.zookeeper.ZooKeeper)[2020-08-05 
> 09:07:57,693] INFO Client environment:host.name=localhost 
> (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,693] INFO Client 
> environment:java.version=11.0.8 (org.apache.zookeeper.ZooKeeper)[2020-08-05 
> 09:07:57,696] INFO Client environment:java.vendor=Ubuntu 
> (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client 
> environment:java.home=/usr/lib/jvm/java-11-openjdk-amd64 
> (org.apache.zookeeper.ZooKeeper)[2020-08-05 09:07:57,696] INFO Client 
> environment:java.class.path=/opt/kafka/current/bin/../libs/activation-1.1.1.jar:/opt/kafka/current/bin/../libs/aopalliance-repackaged-2.5.0.jar:/opt/kafka/current/bin/../libs/argparse4j-0.7.0.jar:/opt/kafka/current/bin/../libs/audience-annotations-0.5.0.jar:/opt/kafka/current/bin/../libs/commons-lang3-3.8.1.jar:/opt/kafka/current/bin/../libs/connect-api-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-basic-auth-extension-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-file-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-json-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-runtime-2.3.1.jar:/opt/kafka/current/bin/../libs/connect-transforms-2.3.1.jar:/opt/kafka/current/bin/../libs/guava-20.0.jar:/opt/kafka/current/bin/../libs/hk2-api-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-locator-2.5.0.jar:/opt/kafka/current/bin/../libs/hk2-utils-2.5.0.jar:/opt/kafka/current/bin/../libs/jackson-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-core-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-databind-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-dataformat-csv-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-datatype-jdk8-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-base-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-jaxrs-json-provider-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-jaxb-annotations-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-paranamer-2.10.0.jar:/opt/kafka/current/bin/../libs/jackson-module-scala_2.12-2.10.0.jar:/opt/kafka/current/bin/

[jira] [Assigned] (KAFKA-10224) The license term about jersey is not correct

2020-07-27 Thread Rens Groothuijsen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10224?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rens Groothuijsen reassigned KAFKA-10224:
-

Assignee: Rens Groothuijsen

> The license term about jersey is not correct
> 
>
> Key: KAFKA-10224
> URL: https://issues.apache.org/jira/browse/KAFKA-10224
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.5.0
>Reporter: jaredli2020
>Assignee: Rens Groothuijsen
>Priority: Critical
>
> Kafka 2.5.0 bundles jersey 2.28. Since 2.28, jersey changed the license type 
> from CDDL/GPLv2+CPE to EPLv2. But in Kafka 2.5.0's LICENSE file 
> [https://github.com/apache/kafka/blob/2.5/LICENSE], it still said
> "This distribution has a binary dependency on jersey, which is available 
> under the CDDL".
> This should be corrected ASAP.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-10159) MirrorSourceConnector don`t work on connect-distributed.sh

2020-07-26 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-10159?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17165362#comment-17165362
 ] 

Rens Groothuijsen commented on KAFKA-10159:
---

[~ryannedolan] From what I can see, MirrorConnectorConfig does try to include 
fields from AdminClientConfig (which holds the required bootstrap.servers 
parameter) but does not validate it as part of its own config definition. I 
suppose an error would have to be thrown manually if any extra parameters 
outside the config definition are missing.

> MirrorSourceConnector don`t work on connect-distributed.sh
> --
>
> Key: KAFKA-10159
> URL: https://issues.apache.org/jira/browse/KAFKA-10159
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.1
> Environment: centos7
>Reporter: cosmozhu
>Priority: Major
> Fix For: 2.4.1
>
> Attachments: connectDistributed.out
>
>
> hi
>  I want to run a MirrorSourceConnector with connect-distributed .
>  the connector config like this :
>  ```
>  {
>  "name" : "cosmo-source",
>  "config" :
> { "connector.class" : 
> "org.apache.kafka.connect.mirror.MirrorSourceConnector", 
> "source.cluster.alias" : "cosmo", "target.cluster.alias" : "nana", 
> "source.cluster.bootstrap.servers" : 
> "192.168.4.42:9092,192.168.4.42:9093,192.168.4.42:9094", "topics" : ".*" }
> }
>  ```
> when I post the rest requestion, it returns to me 
> ```
> {"name":"cosmo-source","config":{"connector.class":"org.apache.kafka.connect.mirror.MirrorSourceConnector","target.cluster.alias":"nana","topics":".*","source.cluster.alias":"cosmo","name":"cosmo-source","source.cluster.bootstrap.servers":"192.168.4.42:9092,192.168.4.42:9093,192.168.4.42:9094"},"tasks":[],"type":"source"}
> ```
> the task array is empty.
> It's obvious that something's wrong here.
> in connectDistributed.out 
> ```
> org.apache.kafka.common.config.ConfigException: Missing required 
> configuration "bootstrap.servers" which has no default value.
> ```
> full logs in the attachment.
> thanks for any help.
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-10132) Kafka Connect JMX MBeans with String values have type double

2020-07-25 Thread Rens Groothuijsen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-10132?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rens Groothuijsen reassigned KAFKA-10132:
-

Assignee: Rens Groothuijsen

> Kafka Connect JMX MBeans with String values have type double
> 
>
> Key: KAFKA-10132
> URL: https://issues.apache.org/jira/browse/KAFKA-10132
> Project: Kafka
>  Issue Type: Bug
>  Components: KafkaConnect
>Affects Versions: 2.5.0
>Reporter: Tom Malaher
>Assignee: Rens Groothuijsen
>Priority: Major
>
> There are quite a few metrics available for source/sink connectors, and many 
> of them are numeric (JMX type "double"), but there are a few attributes that 
> have string values that are still tagged as "double".
> For example:
> Bean: kafka.connect:connector=my-source,type=connector-metrics Attribute: 
> status
> The Attribute Description says: "The status of the connector task. One of 
> 'unassigned', 'running', 'paused', 'failed', or 'destroyed'."
> The value is currently "running" on my instance.
> This causes difficulty for anything that tries to introspect the JMX 
> attribute metadata and then parse/display the data.
> See also 
> [https://stackoverflow.com/questions/50291157/which-jmx-metric-should-be-used-to-monitor-the-status-of-a-connector-in-kafka-co]
>  where this problem is mentioned in one of the answers (dating back to 2018).
> The attribute metadata should be updated to indicate the correct type.
> I suspect the problem lies at line 220 of 
> `org.apache.kafka.common.metrics.JmxReporter` (in version 2.5.0) where a 
> hardcoded `double.class.getName()` is used as the mbean data type even for 
> metrics with a type of String.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9716) Values of compression-rate and compression-rate-avg are misleading

2020-05-13 Thread Rens Groothuijsen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9716?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rens Groothuijsen reassigned KAFKA-9716:


Assignee: Rens Groothuijsen

> Values of compression-rate and compression-rate-avg are misleading
> --
>
> Key: KAFKA-9716
> URL: https://issues.apache.org/jira/browse/KAFKA-9716
> Project: Kafka
>  Issue Type: Bug
>  Components: clients, compression
>Affects Versions: 2.4.1
>Reporter: Christian Kosmowski
>Assignee: Rens Groothuijsen
>Priority: Minor
>
> The values of the following metrics:
> compression-rate and compression-rate-avg and basically every other 
> compression-rate (i.e.) topic compression rate
> are confusing.
> They are calculated as follows:
> {code:java}
> if (numRecords == 0L) {
> buffer().position(initialPosition);
> builtRecords = MemoryRecords.EMPTY;
> } else {
> if (magic > RecordBatch.MAGIC_VALUE_V1)
> this.actualCompressionRatio = (float) writeDefaultBatchHeader() / 
> this.uncompressedRecordsSizeInBytes;
> else if (compressionType != CompressionType.NONE)
> this.actualCompressionRatio = (float) 
> writeLegacyCompressedWrapperHeader() / this.uncompressedRecordsSizeInBytes;
> ByteBuffer buffer = buffer().duplicate();
> buffer.flip();
> buffer.position(initialPosition);
> builtRecords = MemoryRecords.readableRecords(buffer.slice());
> }
> {code}
> basically the compressed size is divided by the uncompressed size which leads 
> to a value < 1 for high compression (good if you want compression) or > 1 for 
> poor compression (bad if you want compression).
> From the name "compression rate" i would expect the exact opposite. Apart 
> from the fact that the word "rate" usually refers to comparisons based on 
> values of different units (miles per hour) the correct word "ratio" would 
> refer to the uncompressed size divided by the compressed size. (In the code 
> this is correct, but not with the metric names)
> So if the compressed data takes half the space of the uncompressed data the 
> correct value for compression ratio (or rate) would be 2 and not 0.5 as kafka 
> reports it. That is really confusing and i would AT LEAST expect that this 
> behaviour would be documented somewhere, but it's not all documentation 
> sources just say "the compression rate".



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9968) Newly subscribed topic not present in metadata request

2020-05-12 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17105860#comment-17105860
 ] 

Rens Groothuijsen commented on KAFKA-9968:
--

[~cmanhin] I personally wouldn't consider this correct behavior, though I don't 
have much knowledge of the architecture myself. So yes, I'd say it's a 
workaround.

> Newly subscribed topic not present in metadata request
> --
>
> Key: KAFKA-9968
> URL: https://issues.apache.org/jira/browse/KAFKA-9968
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Man Hin
>Priority: Major
> Attachments: KafkaClientVersionTest.java
>
>
> Our application subscribes to multiple topics one by one. It uses to work 
> fine. But after we have upgraded our Kafka client version from 2.4.0 and 
> 2.4.1, our application failed to receive messages for the last topic any more.
> I spotted a warning log from Kafka client.
> {code:java}
> 2020-05-08 01:01:11.059 [main] [WARN ] o.a.k.c.c.i.ConsumerCoordinator - 
> [Consumer clientId=sample_consumer, groupId=sample_client] The following 
> subscribed topics are not assigned to any members: [TopicX]  {code}
> I'm able to reproduce it with a test case running against a live Kafka broker 
> (we are using v2.4.1 broker).
> {code:java}
> @Test
> public void WHEN_subscribed_sequentially_THEN_receive_assignment() throws 
> InterruptedException, ExecutionException, TimeoutException {
> // WHEN
> List topics = new ArrayList<>();
> topics.add(TOPIC_C);
> consumer.subscribe(topics);
> consumer.poll(0);
> topics.add(TOPIC_B);
> consumer.subscribe(topics);
> consumer.poll(0);
> topics.add(TOPIC_A);
> consumer.subscribe(topics);
> consumer.poll(0);
> // THEN
> Set assignments = consumer.assignment();
> Set topicSet = assignments.stream().map(p -> 
> p.topic()).distinct().collect(Collectors.toSet());
> logger.info("Topic: {}", topicSet);
> assertThat(topicSet, hasItems(TOPIC_C, TOPIC_B, TOPIC_A));
> }{code}
> We turned on trace log and found that the metadata requests always missed the 
> last topic we subscribed.
> {code:java}
> 2020-05-08 01:01:10.665 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): 
> TopicC
> 2020-05-08 01:01:10.983 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Sending metadata request 
> MetadataRequestData(topics=[MetadataRequestTopic(name='TopicC')], 
> allowAutoTopicCreation=true, includeClusterA
> 2020-05-08 01:01:11.003 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - 
> [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup 
> (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=1, 
> rebalanceTimeoutMs=30, memberId=
> 2020-05-08 01:01:11.015 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - 
> [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup 
> (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=1, 
> rebalanceTimeoutMs=30, memberId=
> 2020-05-08 01:01:11.049 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): 
> TopicC, TopicB
> 2020-05-08 01:01:11.053 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Sending metadata request 
> MetadataRequestData(topics=[MetadataRequestTopic(name='TopicC')], 
> allowAutoTopicCreation=true, includeClusterA
> 2020-05-08 01:01:11.057 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - 
> [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup 
> (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=1, 
> rebalanceTimeoutMs=30, memberId=
> 2020-05-08 01:01:11.062 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): 
> TopicC, TopicB, TopicA
> 2020-05-08 01:01:11.062 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Sending metadata request 
> MetadataRequestData(topics=[MetadataRequestTopic(name='TopicB'), 
> MetadataRequestTopic(name='TopicC')], allowAu
> 2020-05-08 01:01:11.064 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - 
> [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup 
> (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=1, 
> rebalanceTimeoutMs=30, memberId=
> {code}
> I suspect this is because SubscriptionState.groupSubscription contains only 
> topi

[jira] [Commented] (KAFKA-9968) Newly subscribed topic not present in metadata request

2020-05-12 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9968?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17105792#comment-17105792
 ] 

Rens Groothuijsen commented on KAFKA-9968:
--

[~cmanhin] Thanks for the test case! From what I can see, the consumer did not 
assign the topics because at that moment, 0 partitions were available for those 
topics according to the metadata. Forcing a metadata update either through 
unsubscribing/resubscribing or polling again caused the assignment list to 
contain the proper values.

> Newly subscribed topic not present in metadata request
> --
>
> Key: KAFKA-9968
> URL: https://issues.apache.org/jira/browse/KAFKA-9968
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.5.0, 2.4.1
>Reporter: Man Hin
>Priority: Major
> Attachments: KafkaClientVersionTest.java
>
>
> Our application subscribes to multiple topics one by one. It uses to work 
> fine. But after we have upgraded our Kafka client version from 2.4.0 and 
> 2.4.1, our application failed to receive messages for the last topic any more.
> I spotted a warning log from Kafka client.
> {code:java}
> 2020-05-08 01:01:11.059 [main] [WARN ] o.a.k.c.c.i.ConsumerCoordinator - 
> [Consumer clientId=sample_consumer, groupId=sample_client] The following 
> subscribed topics are not assigned to any members: [TopicX]  {code}
> I'm able to reproduce it with a test case running against a live Kafka broker 
> (we are using v2.4.1 broker).
> {code:java}
> @Test
> public void WHEN_subscribed_sequentially_THEN_receive_assignment() throws 
> InterruptedException, ExecutionException, TimeoutException {
> // WHEN
> List topics = new ArrayList<>();
> topics.add(TOPIC_C);
> consumer.subscribe(topics);
> consumer.poll(0);
> topics.add(TOPIC_B);
> consumer.subscribe(topics);
> consumer.poll(0);
> topics.add(TOPIC_A);
> consumer.subscribe(topics);
> consumer.poll(0);
> // THEN
> Set assignments = consumer.assignment();
> Set topicSet = assignments.stream().map(p -> 
> p.topic()).distinct().collect(Collectors.toSet());
> logger.info("Topic: {}", topicSet);
> assertThat(topicSet, hasItems(TOPIC_C, TOPIC_B, TOPIC_A));
> }{code}
> We turned on trace log and found that the metadata requests always missed the 
> last topic we subscribed.
> {code:java}
> 2020-05-08 01:01:10.665 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): 
> TopicC
> 2020-05-08 01:01:10.983 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Sending metadata request 
> MetadataRequestData(topics=[MetadataRequestTopic(name='TopicC')], 
> allowAutoTopicCreation=true, includeClusterA
> 2020-05-08 01:01:11.003 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - 
> [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup 
> (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=1, 
> rebalanceTimeoutMs=30, memberId=
> 2020-05-08 01:01:11.015 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - 
> [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup 
> (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=1, 
> rebalanceTimeoutMs=30, memberId=
> 2020-05-08 01:01:11.049 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): 
> TopicC, TopicB
> 2020-05-08 01:01:11.053 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Sending metadata request 
> MetadataRequestData(topics=[MetadataRequestTopic(name='TopicC')], 
> allowAutoTopicCreation=true, includeClusterA
> 2020-05-08 01:01:11.057 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - 
> [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup 
> (JoinGroupRequestData(groupId='sample_client', sessionTimeoutMs=1, 
> rebalanceTimeoutMs=30, memberId=
> 2020-05-08 01:01:11.062 [main] [INFO ] o.a.k.c.c.KafkaConsumer - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Subscribed to topic(s): 
> TopicC, TopicB, TopicA
> 2020-05-08 01:01:11.062 [main] [DEBUG] o.a.k.c.NetworkClient - [Consumer 
> clientId=sample_consumer, groupId=sample_client] Sending metadata request 
> MetadataRequestData(topics=[MetadataRequestTopic(name='TopicB'), 
> MetadataRequestTopic(name='TopicC')], allowAu
> 2020-05-08 01:01:11.064 [main] [DEBUG] o.a.k.c.c.i.AbstractCoordinator - 
> [Consumer clientId=sample_consumer, groupId=sample_client] Sending JoinGroup 
> (JoinGroupRequestData(groupId=

[jira] [Commented] (KAFKA-9603) Number of open files keeps increasing in Streams application

2020-05-02 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9603?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17097996#comment-17097996
 ] 

Rens Groothuijsen commented on KAFKA-9603:
--

[~biljazovic] It does indeed appear to affect standby tasks only. When I run 
the reproduction but leave num.standby.replicas at 0, the numbers remain 
relatively stable.

> Number of open files keeps increasing in Streams application
> 
>
> Key: KAFKA-9603
> URL: https://issues.apache.org/jira/browse/KAFKA-9603
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Affects Versions: 2.1.0, 2.2.0, 2.4.0, 2.3.1
> Environment: Spring Boot 2.2.4, OpenJDK 13, Centos image
>Reporter: Bruno Iljazovic
>Priority: Major
>
> Problem appeared when upgrading from *2.0.1* to *2.3.1*. 
> Relevant Kafka Streams code:
> {code:java}
> KStream events1 =
> builder.stream(FIRST_TOPIC_NAME, Consumed.with(stringSerde, event1Serde, 
> event1TimestampExtractor(), null))
>.mapValues(...);
> KStream events2 =
> builder.stream(SECOND_TOPIC_NAME, Consumed.with(stringSerde, event2Serde, 
> event2TimestampExtractor(), null))
>.mapValues(...);
> var joinWindows = JoinWindows.of(Duration.of(1, MINUTES).toMillis())
>  .until(Duration.of(1, HOURS).toMillis());
> events2.join(events1, this::join, joinWindows, Joined.with(stringSerde, 
> event2Serde, event1Serde))
>.foreach(...);
> {code}
> Number of open *.sst files keeps increasing until eventually it hits the os 
> limit (65536) and causes this exception:
> {code:java}
> Caused by: org.rocksdb.RocksDBException: While open a file for appending: 
> /.../0_8/KSTREAM-JOINOTHER-10-store/KSTREAM-JOINOTHER-10-store.157943520/001354.sst:
>  Too many open files
>   at org.rocksdb.RocksDB.flush(Native Method)
>   at org.rocksdb.RocksDB.flush(RocksDB.java:2394)
> {code}
> Here are example files that are opened and never closed:
> {code:java}
> /.../0_27/KSTREAM-JOINTHIS-09-store/KSTREAM-JOINTHIS-09-store.158245920/000114.sst
> /.../0_27/KSTREAM-JOINOTHER-10-store/KSTREAM-JOINOTHER-10-store.158245920/65.sst
> /.../0_29/KSTREAM-JOINTHIS-09-store/KSTREAM-JOINTHIS-09-store.158215680/000115.sst
> /.../0_29/KSTREAM-JOINTHIS-09-store/KSTREAM-JOINTHIS-09-store.158245920/000112.sst
> /.../0_31/KSTREAM-JOINTHIS-09-store/KSTREAM-JOINTHIS-09-store.158185440/51.sst
> {code}



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Comment Edited] (KAFKA-9745) Attribute is missed in kafka mirror maker metrics

2020-04-29 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095694#comment-17095694
 ] 

Rens Groothuijsen edited comment on KAFKA-9745 at 4/29/20, 4:57 PM:


[~krasnovnikita] From what I can see in the code, in MM only consumer groups 
have a "source" property exposed, which admittedly might be a bit confusing if 
you're expecting it on every type of object. According to the documentation you 
linked, "source" isn't provided as a property for partitions because it can be 
inferred from the topic name.


was (Author: rensgroothuijsen):
[~krasnovnikita] From what I can see in the code, in MM only consumer groups 
have a "source" metric exposed, which admittedly might be a bit confusing if 
you're expecting it on every type of object. According to the documentation you 
linked, "source" isn't provided as a metric for partitions because it can be 
inferred from the topic name.

> Attribute is missed in kafka mirror maker metrics
> -
>
> Key: KAFKA-9745
> URL: https://issues.apache.org/jira/browse/KAFKA-9745
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Nikita Krasnov
>Priority: Minor
> Attachments: kmm_metrics.png
>
>
> As it described in Kafka mirror maker 
> [doc|https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process]
>  metrics are tagged with the following properties:
> *target*: alias of target cluster
> *source*: alias of source cluster
> *topic*: remote topic on target cluster
> *partition*: partition being replicated
> But in real life *source* attribute is missed  !kmm_metrics.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9745) Attribute is missed in kafka mirror maker metrics

2020-04-29 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9745?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17095694#comment-17095694
 ] 

Rens Groothuijsen commented on KAFKA-9745:
--

[~krasnovnikita] From what I can see in the code, in MM only consumer groups 
have a "source" metric exposed, which admittedly might be a bit confusing if 
you're expecting it on every type of object. According to the documentation you 
linked, "source" isn't provided as a metric for partitions because it can be 
inferred from the topic name.

> Attribute is missed in kafka mirror maker metrics
> -
>
> Key: KAFKA-9745
> URL: https://issues.apache.org/jira/browse/KAFKA-9745
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Nikita Krasnov
>Priority: Minor
> Attachments: kmm_metrics.png
>
>
> As it described in Kafka mirror maker 
> [doc|https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process]
>  metrics are tagged with the following properties:
> *target*: alias of target cluster
> *source*: alias of source cluster
> *topic*: remote topic on target cluster
> *partition*: partition being replicated
> But in real life *source* attribute is missed  !kmm_metrics.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9745) Attribute is missed in kafka mirror maker metrics

2020-04-29 Thread Rens Groothuijsen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rens Groothuijsen reassigned KAFKA-9745:


Assignee: (was: Rens Groothuijsen)

> Attribute is missed in kafka mirror maker metrics
> -
>
> Key: KAFKA-9745
> URL: https://issues.apache.org/jira/browse/KAFKA-9745
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Nikita Krasnov
>Priority: Minor
> Attachments: kmm_metrics.png
>
>
> As it described in Kafka mirror maker 
> [doc|https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process]
>  metrics are tagged with the following properties:
> *target*: alias of target cluster
> *source*: alias of source cluster
> *topic*: remote topic on target cluster
> *partition*: partition being replicated
> But in real life *source* attribute is missed  !kmm_metrics.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Assigned] (KAFKA-9745) Attribute is missed in kafka mirror maker metrics

2020-04-29 Thread Rens Groothuijsen (Jira)


 [ 
https://issues.apache.org/jira/browse/KAFKA-9745?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rens Groothuijsen reassigned KAFKA-9745:


Assignee: Rens Groothuijsen

> Attribute is missed in kafka mirror maker metrics
> -
>
> Key: KAFKA-9745
> URL: https://issues.apache.org/jira/browse/KAFKA-9745
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.5.0
>Reporter: Nikita Krasnov
>Assignee: Rens Groothuijsen
>Priority: Minor
> Attachments: kmm_metrics.png
>
>
> As it described in Kafka mirror maker 
> [doc|https://github.com/apache/kafka/tree/trunk/connect/mirror#monitoring-an-mm2-process]
>  metrics are tagged with the following properties:
> *target*: alias of target cluster
> *source*: alias of source cluster
> *topic*: remote topic on target cluster
> *partition*: partition being replicated
> But in real life *source* attribute is missed  !kmm_metrics.png! 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9438) Issue with mm2 active/active replication

2020-04-06 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9438?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17076672#comment-17076672
 ] 

Rens Groothuijsen commented on KAFKA-9438:
--

[~romanius11] Tried to replicate this issue, though the only scenario in which 
I managed to do so was one where the MM instance accidentally looped back to 
the source cluster rather than writing to the target cluster.

> Issue with mm2 active/active replication
> 
>
> Key: KAFKA-9438
> URL: https://issues.apache.org/jira/browse/KAFKA-9438
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Roman
>Priority: Minor
>
> Hi,
>  
> i am trying to configure the the active/active with new kafka 2.4.0 and MM2.
> I have 2 kafka clusters of 3 kafkas, one in lets say BO and on in IN.
> In each cluster there are 3 kafkas.
> Topics are replicated properly so in BO i see
> {quote}topics
> in.topics
> {quote}
>  
> in IN i see
> {quote}topics
> bo.topic
> {quote}
>  
> That should be according to documentation.
>  
> But when I stop the replication process on one data center and start it up, 
> the replication replicate the topics with the same prefix twice bo.bo.topics 
> or in.in.topics depending on what connector i restart.
> I have also blacklisted the topics but they are still replicating.
>  
> bo.properties file
> {quote}name = in-bo
>  #topics = .*
>  topics.blacklist = "bo.*"
>  #groups = .*
>  connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
>  tasks.max = 10
> source.cluster.alias = in
>  target.cluster.alias = bo
>  source.cluster.bootstrap.servers = 
> IN-kafka1:9092,IN-kafka2:9092,IN-kafka3:9092
>  target.cluster.bootstrap.servers = 
> BO-kafka1:9092,BO-kafka2:9092,bo-kafka39092
> use ByteArrayConverter to ensure that records are not re-encoded
>  key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
>  value.converter = org.apache.kafka.connect.converters.ByteArrayConverter
>  
> {quote}
> in.properties
> {quote}name = bo-in
>  #topics = .*
>  topics.blacklist = "in.*"
>  #groups = .*
>  connector.class = org.apache.kafka.connect.mirror.MirrorSourceConnector
>  tasks.max = 10
> source.cluster.alias = bo
>  target.cluster.alias = in
>  target.cluster.bootstrap.servers = 
> IN-kafka1:9092,IN-kafka2:9092,IN-kafka3:9092
>  source.cluster.bootstrap.servers = 
> BO-kafka1:9092,BO-kafka2:9092,BO-kafka3:9092
> use ByteArrayConverter to ensure that records are not re-encoded
>  key.converter = org.apache.kafka.connect.converters.ByteArrayConverter
>  value.converter = org.apache.kafka.connect.converters.ByteArrayConverter
> {quote}
>  
>  
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8809) Infinite retry if secure cluster tried to be reached from non-secure consumer

2020-04-05 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-8809?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17075890#comment-17075890
 ] 

Rens Groothuijsen commented on KAFKA-8809:
--

[~gsomogyi] I assume you're using the Java API? In that case, something like 
{{consumer.poll(Duration.ofMillis(timeout))}} should work. That did the trick 
for me, at least in the case of a failed SSL handshake.

> Infinite retry if secure cluster tried to be reached from non-secure consumer
> -
>
> Key: KAFKA-8809
> URL: https://issues.apache.org/jira/browse/KAFKA-8809
> Project: Kafka
>  Issue Type: Bug
>Affects Versions: 2.3.0, 2.4.0
>Reporter: Gabor Somogyi
>Priority: Critical
>
> Such case the following happening without throwing exception:
> {code:java}
> 19/08/15 04:10:44 INFO AppInfoParser: Kafka version: 2.3.0
> 19/08/15 04:10:44 INFO AppInfoParser: Kafka commitId: fc1aaa116b661c8a
> 19/08/15 04:10:44 INFO AppInfoParser: Kafka startTimeMs: 1565867444977
> 19/08/15 04:10:44 INFO KafkaConsumer: [Consumer clientId=consumer-1, 
> groupId=spark-kafka-source-8f333224-77d8-477c-a401-5bd0fce85d69--1266757633-driver-0]
>  Subscribed to topic(s): topic-68f2c4c2-71a4-4380-a7c4-6fe0b9eea7ef
> 19/08/15 04:10:44 INFO Selector: [SocketServer brokerId=0] Failed 
> authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA 
> during SASL handshake.)
> 19/08/15 04:10:45 WARN NetworkClient: [Consumer clientId=consumer-1, 
> groupId=spark-kafka-source-8f333224-77d8-477c-a401-5bd0fce85d69--1266757633-driver-0]
>  Bootstrap broker 127.0.0.1:62995 (id: -1 rack: null) disconnected
> 19/08/15 04:10:45 INFO Selector: [SocketServer brokerId=0] Failed 
> authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA 
> during SASL handshake.)
> 19/08/15 04:10:45 WARN NetworkClient: [Consumer clientId=consumer-1, 
> groupId=spark-kafka-source-8f333224-77d8-477c-a401-5bd0fce85d69--1266757633-driver-0]
>  Bootstrap broker 127.0.0.1:62995 (id: -1 rack: null) disconnected
> 19/08/15 04:10:45 INFO Selector: [SocketServer brokerId=0] Failed 
> authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA 
> during SASL handshake.)
> 19/08/15 04:10:46 WARN NetworkClient: [Consumer clientId=consumer-1, 
> groupId=spark-kafka-source-8f333224-77d8-477c-a401-5bd0fce85d69--1266757633-driver-0]
>  Bootstrap broker 127.0.0.1:62995 (id: -1 rack: null) disconnected
> 19/08/15 04:10:46 INFO Selector: [SocketServer brokerId=0] Failed 
> authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA 
> during SASL handshake.)
> 19/08/15 04:10:46 WARN NetworkClient: [Consumer clientId=consumer-1, 
> groupId=spark-kafka-source-8f333224-77d8-477c-a401-5bd0fce85d69--1266757633-driver-0]
>  Bootstrap broker 127.0.0.1:62995 (id: -1 rack: null) disconnected
> 19/08/15 04:10:46 INFO Selector: [SocketServer brokerId=0] Failed 
> authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA 
> during SASL handshake.)
> 19/08/15 04:10:46 WARN NetworkClient: [Consumer clientId=consumer-1, 
> groupId=spark-kafka-source-8f333224-77d8-477c-a401-5bd0fce85d69--1266757633-driver-0]
>  Bootstrap broker 127.0.0.1:62995 (id: -1 rack: null) disconnected
> 19/08/15 04:10:46 INFO Selector: [SocketServer brokerId=0] Failed 
> authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA 
> during SASL handshake.)
> 19/08/15 04:10:47 WARN NetworkClient: [Consumer clientId=consumer-1, 
> groupId=spark-kafka-source-8f333224-77d8-477c-a401-5bd0fce85d69--1266757633-driver-0]
>  Bootstrap broker 127.0.0.1:62995 (id: -1 rack: null) disconnected
> 19/08/15 04:10:47 INFO Selector: [SocketServer brokerId=0] Failed 
> authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA 
> during SASL handshake.)
> 19/08/15 04:10:47 WARN NetworkClient: [Consumer clientId=consumer-1, 
> groupId=spark-kafka-source-8f333224-77d8-477c-a401-5bd0fce85d69--1266757633-driver-0]
>  Bootstrap broker 127.0.0.1:62995 (id: -1 rack: null) disconnected
> 19/08/15 04:10:47 INFO Selector: [SocketServer brokerId=0] Failed 
> authentication with /127.0.0.1 (Unexpected Kafka request of type METADATA 
> during SASL handshake.)
> {code}
> I've tried to find a timeout or retry count but nothing helped.



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9482) mirror maker 2.0 doesn't replicates (create topic) created automatically by producing messages

2020-04-02 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9482?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17074132#comment-17074132
 ] 

Rens Groothuijsen commented on KAFKA-9482:
--

[~grinfeld] It works for me. The default source topic refresh interval is 10 
minutes, though, could it be that not enough time had passed yet for a refresh?

> mirror maker 2.0 doesn't replicates (create topic) created automatically by 
> producing messages
> --
>
> Key: KAFKA-9482
> URL: https://issues.apache.org/jira/browse/KAFKA-9482
> Project: Kafka
>  Issue Type: Bug
>  Components: mirrormaker
>Affects Versions: 2.4.0
>Reporter: Mikhail Grinfeld
>Priority: Minor
>
> I have 2 kafka (3 brokers each) clusters and MirrorMaker instance.  (built 
> with docker and docker-compose)
> Both cluster is defined to create topics automatically (no 
> auto.create.topics.enable in conf and default, at least as it appears in 
> docs, is true).
> If topic created on source cluster before MirrorMaker starts, everything 
> works as expected: producig messages to source cluster, causes replication to 
> destination cluster.
> If topic doesn't exist on source cluster, when starting to produce messages, 
> it created in source cluster, but not in destination - and no replication 
> performed.
>  
> Using following mm2.properties:
> {code:java}
> # mm2.propertiesclusters=src,dest
> src.bootstrap.servers=kafka-1:9092,kafka-2:19092,kafka-3:29092
> dest.bootstrap.servers=kafka-4:39092,kafka-5:49092,kafka-6:59092
> src->dest.enabled=true
> src->dest.topics=.*
> {code}
> and running MirrorMaker with
> {code:java}
> connect-mirror-maker /etc/mm2.properties --clusters src dest
> {code}
> Note:
> when I am using Kafka-Streams to read from initial topic, there are few 
> KTable topics created automatically by Kafka-Stream - and these topics 
> created OK (of course, when initial topic created at the beginning)



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-9547) Kafka transaction - skip one offset when the application stops and be started again

2020-04-01 Thread Rens Groothuijsen (Jira)


[ 
https://issues.apache.org/jira/browse/KAFKA-9547?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17072917#comment-17072917
 ] 

Rens Groothuijsen commented on KAFKA-9547:
--

[~cheatmenot] As far as I know this is the expected behavior, since the 
transaction itself is also written to the partition as a record.

> Kafka transaction - skip one offset when the application stops and be started 
> again
> ---
>
> Key: KAFKA-9547
> URL: https://issues.apache.org/jira/browse/KAFKA-9547
> Project: Kafka
>  Issue Type: Bug
>  Components: clients
>Affects Versions: 2.4.0
> Environment: I am using kafka-clients 2.4.0 and 
> wurstmeister/kafka:2.12-2.3.0
>Reporter: Rumel
>Priority: Minor
>
> To be fair, I have tested it with normal kafka without transaction scheme, 
> and it does not skip the offset when I try to rerun the ProducerTest like a 
> lot of times.
> {code:java}
> object ProducerTest extends LazyLogging {
>   def main(args: Array[String]): Unit = {
> val props = new Properties()
> props.put("bootstrap.servers", "kafka.local:9092")
> props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer")
> props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer")
> props.put("acks", "all")
> props.put("retries", "3")
> val producer = new KafkaProducer[String, String](props)
> val record = new ProducerRecord[String, String]("zxc", "key", "value")
> val record2 = new ProducerRecord[String, String]("zxc", "key2", "value2")
> val record3 = new ProducerRecord[String, String]("zxc", "key3", "value3")
> producer.send(record)
> producer.send(record2)
> producer.send(record3)
> Thread.sleep(3000)
>   }
> }{code}
> But when I enable transaction on producer, it will skip one offset when the 
> ProducerTestWithTransaction application is rerun. Like when I first started 
> it, it has an offset of 0,1,2 then after rerun, it will be 4,5,6 which skips 
> 3, and so on and so forth.
> {code:java}
> object ProducerTestWithTransaction extends LazyLogging {
>   def main(args: Array[String]): Unit = {
> val props = new Properties()
> props.put("bootstrap.servers", "kafka.local:9092")
> props.put("key.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer")
> props.put("value.serializer", 
> "org.apache.kafka.common.serialization.StringSerializer")
> props.put("enable.idempotence", "true")
> props.put("transactional.id", "alona")
> props.put("acks", "all")
> props.put("retries", "3")
> val producer = new KafkaProducer[String, String](props)
> val record = new ProducerRecord[String, String]("wew", "key", "value")
> val record2 = new ProducerRecord[String, String]("wew", "key2", "value2")
> val record3 = new ProducerRecord[String, String]("wew", "key3", "value3")
> producer.initTransactions()
> try {
>   producer.beginTransaction()
>   producer.send(record)
>   producer.send(record2)
>   producer.send(record3)
>   producer.commitTransaction()
> } catch {
>   case e: ProducerFencedException => producer.close()
>   case e: Exception => producer.abortTransaction();
> }
>   }
> }{code}
> Please enlighten me why this is happening? Is this the standard behavior when 
> we are using transaction? Is there any workaround on this to not skip an 
> offset. Thanks!



--
This message was sent by Atlassian Jira
(v8.3.4#803005)


[jira] [Commented] (KAFKA-8293) Messages undelivered when small quotas applied

2019-05-30 Thread Rens Groothuijsen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8293?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16852160#comment-16852160
 ] 

Rens Groothuijsen commented on KAFKA-8293:
--

I haven't been able to replicate this issue with a local single-node 
configuration. Are you sending these messages over a network, or is it entirely 
local?

> Messages undelivered when small quotas applied 
> ---
>
> Key: KAFKA-8293
> URL: https://issues.apache.org/jira/browse/KAFKA-8293
> Project: Kafka
>  Issue Type: Bug
>  Components: core
>Affects Versions: 2.1.1
>Reporter: Kirill Kulikov
>Priority: Major
>  Labels: quotas
>
> I observe a strange Kafka behavior when using small quotas.
> For ex. I set quotas for the consumer like
>  
> {code:java}
> kafka-configs --zookeeper zookeeper:2181 --entity-type users --entity-name 
> kafka --alter --add-config 'producer_byte_rate=2048000, 
> consumer_byte_rate=256'{code}
>  
> If I send a small batch of messages as
>  
> {code:java}
> kafka-producer-perf-test --producer.config /etc/kafka/consumer.properties 
> --producer-props acks=-1 compression.type=none bootstrap.servers=kafka:9093 
> --num-records 10 --record-size 20 --throughput 1000 --print-metrics --topic 
> test 
> {code}
> they go through without problems.
> But when the batch is bigger
>  
> {code:java}
> kafka-producer-perf-test --producer.config /etc/kafka/consumer.properties 
> --producer-props acks=-1 compression.type=none bootstrap.servers=kafka:9093 
> --num-records 100 --record-size 20 --throughput 1000 --print-metrics --topic 
> test
> {code}
> ... I do not get any messages on the consumer side *at all*.
> On the other hand, if `kafka-producer-perf-test` throughput is limited like
>  
> {code:java}
> kafka-producer-perf-test --producer.config /etc/kafka/consumer.properties 
> --producer-props acks=-1 compression.type=none bootstrap.servers=kafka:9093 
> --num-records 1000 --record-size 10 --throughput 10 --print-metrics --topic 
> test
> {code}
> I can see only the first 20-30 messages in `kafka-console-consumer`. But then 
> it gets stuck (throttled perhaps) and other queued messages never come 
> through.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-2939) Make AbstractConfig.logUnused() tunable for clients

2019-05-27 Thread Rens Groothuijsen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rens Groothuijsen reassigned KAFKA-2939:


Assignee: (was: Rens Groothuijsen)

> Make AbstractConfig.logUnused() tunable for clients
> ---
>
> Key: KAFKA-2939
> URL: https://issues.apache.org/jira/browse/KAFKA-2939
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Reporter: Guozhang Wang
>Priority: Major
>  Labels: newbie
>
> Today we always log unused configs in KafkaProducer / KafkaConsumer in their 
> constructors, however for some cases like Kafka Streams that make use of 
> these clients, other configs may be passed in to configure Partitioner / 
> Serializer classes, etc. So it would be better to make this function call 
> optional to avoid printing unnecessary and confusing WARN entries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8297) Kafka Streams ConsumerRecordFactory yields difficult compiler error about generics

2019-05-26 Thread Rens Groothuijsen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848480#comment-16848480
 ] 

Rens Groothuijsen edited comment on KAFKA-8297 at 5/26/19 4:42 PM:
---

Would it break the existing interface to only expose create(K, V)? Technically 
speaking, this method could then still handle the case where K happens to be a 
String. Or would that be too unintuitive for the user?

 

Edit: upon closer inspection, it does seem like it would be confusing because 
the meaning of the parameters changes depending on the input. They could be 
distinguished by a flag that indicates whether the first parameter should be 
interpreted as a topic name, but that isn't optimal either.


was (Author: rensgroothuijsen):
Would it break the existing interface to only expose create(K, V)? Technically 
speaking, this method could then still handle the case where K happens to be a 
String. Or would that be too unintuitive for the user?

> Kafka Streams ConsumerRecordFactory yields difficult compiler error about 
> generics
> --
>
> Key: KAFKA-8297
> URL: https://issues.apache.org/jira/browse/KAFKA-8297
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Michael Drogalis
>Priority: Minor
>
> When using the ConsumerRecordFactory, it's convenient to specify a default 
> topic to create records with:
> {code:java}
> ConsumerRecordFactory inputFactory = new 
> ConsumerRecordFactory<>(inputTopic, keySerializer, valueSerializer);
> {code}
> However, when the factory is used to create a record with a String key:
> {code:java}
> inputFactory.create("any string", user)
> {code}
> Compilation fails with the following warning:
> {code:java}
> Ambiguous method call. Both:
> create(String, User) in ConsumerRecordFactory and
> create(String, User) in ConsumerRecordFactory match
> {code}
> At first glance, this is a really confusing error to see during compilation. 
> What's happening is that there are two clashing signatures for `create`: 
> create(K, V) and create(String, V). The latter signature represents a topic 
> name.
> It seems like fixing this would require breaking the existing interface. This 
> is a really opaque problem to hit though, and it would be great if we could 
> avoid having users encounter this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Comment Edited] (KAFKA-8297) Kafka Streams ConsumerRecordFactory yields difficult compiler error about generics

2019-05-26 Thread Rens Groothuijsen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848480#comment-16848480
 ] 

Rens Groothuijsen edited comment on KAFKA-8297 at 5/26/19 4:42 PM:
---

Would it break the existing interface to only expose create(K, V)? Technically 
speaking, this method could then still handle the case where K happens to be a 
String. Or would that be too unintuitive for the user?

Edit: upon closer inspection, it does seem like it would be confusing because 
the meaning of the parameters changes depending on the input. They could be 
distinguished by a flag that indicates whether the first parameter should be 
interpreted as a topic name, but that isn't optimal either.


was (Author: rensgroothuijsen):
Would it break the existing interface to only expose create(K, V)? Technically 
speaking, this method could then still handle the case where K happens to be a 
String. Or would that be too unintuitive for the user?

 

Edit: upon closer inspection, it does seem like it would be confusing because 
the meaning of the parameters changes depending on the input. They could be 
distinguished by a flag that indicates whether the first parameter should be 
interpreted as a topic name, but that isn't optimal either.

> Kafka Streams ConsumerRecordFactory yields difficult compiler error about 
> generics
> --
>
> Key: KAFKA-8297
> URL: https://issues.apache.org/jira/browse/KAFKA-8297
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Michael Drogalis
>Priority: Minor
>
> When using the ConsumerRecordFactory, it's convenient to specify a default 
> topic to create records with:
> {code:java}
> ConsumerRecordFactory inputFactory = new 
> ConsumerRecordFactory<>(inputTopic, keySerializer, valueSerializer);
> {code}
> However, when the factory is used to create a record with a String key:
> {code:java}
> inputFactory.create("any string", user)
> {code}
> Compilation fails with the following warning:
> {code:java}
> Ambiguous method call. Both:
> create(String, User) in ConsumerRecordFactory and
> create(String, User) in ConsumerRecordFactory match
> {code}
> At first glance, this is a really confusing error to see during compilation. 
> What's happening is that there are two clashing signatures for `create`: 
> create(K, V) and create(String, V). The latter signature represents a topic 
> name.
> It seems like fixing this would require breaking the existing interface. This 
> is a really opaque problem to hit though, and it would be great if we could 
> avoid having users encounter this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-8297) Kafka Streams ConsumerRecordFactory yields difficult compiler error about generics

2019-05-26 Thread Rens Groothuijsen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-8297?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848480#comment-16848480
 ] 

Rens Groothuijsen commented on KAFKA-8297:
--

Would it break the existing interface to only expose create(K, V)? Technically 
speaking, this method could then still handle the case where K happens to be a 
String. Or would that be too unintuitive for the user?

> Kafka Streams ConsumerRecordFactory yields difficult compiler error about 
> generics
> --
>
> Key: KAFKA-8297
> URL: https://issues.apache.org/jira/browse/KAFKA-8297
> Project: Kafka
>  Issue Type: Bug
>  Components: streams
>Reporter: Michael Drogalis
>Priority: Minor
>
> When using the ConsumerRecordFactory, it's convenient to specify a default 
> topic to create records with:
> {code:java}
> ConsumerRecordFactory inputFactory = new 
> ConsumerRecordFactory<>(inputTopic, keySerializer, valueSerializer);
> {code}
> However, when the factory is used to create a record with a String key:
> {code:java}
> inputFactory.create("any string", user)
> {code}
> Compilation fails with the following warning:
> {code:java}
> Ambiguous method call. Both:
> create(String, User) in ConsumerRecordFactory and
> create(String, User) in ConsumerRecordFactory match
> {code}
> At first glance, this is a really confusing error to see during compilation. 
> What's happening is that there are two clashing signatures for `create`: 
> create(K, V) and create(String, V). The latter signature represents a topic 
> name.
> It seems like fixing this would require breaking the existing interface. This 
> is a really opaque problem to hit though, and it would be great if we could 
> avoid having users encounter this.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-2939) Make AbstractConfig.logUnused() tunable for clients

2019-05-26 Thread Rens Groothuijsen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-2939?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16848414#comment-16848414
 ] 

Rens Groothuijsen commented on KAFKA-2939:
--

Took a shot at implementing the boolean idea, though this would mean the 3 
places where logUnused() is actually called (KafkaConsumer, KafkaProducer and 
KafkaAdminClient) would also need to have an extra boolean tacked on to their 
constructors. It looks very hacky, to be honest.

> Make AbstractConfig.logUnused() tunable for clients
> ---
>
> Key: KAFKA-2939
> URL: https://issues.apache.org/jira/browse/KAFKA-2939
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Reporter: Guozhang Wang
>Assignee: Rens Groothuijsen
>Priority: Major
>  Labels: newbie
>
> Today we always log unused configs in KafkaProducer / KafkaConsumer in their 
> constructors, however for some cases like Kafka Streams that make use of 
> these clients, other configs may be passed in to configure Partitioner / 
> Serializer classes, etc. So it would be better to make this function call 
> optional to avoid printing unnecessary and confusing WARN entries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Assigned] (KAFKA-2939) Make AbstractConfig.logUnused() tunable for clients

2019-05-21 Thread Rens Groothuijsen (JIRA)


 [ 
https://issues.apache.org/jira/browse/KAFKA-2939?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rens Groothuijsen reassigned KAFKA-2939:


Assignee: Rens Groothuijsen

> Make AbstractConfig.logUnused() tunable for clients
> ---
>
> Key: KAFKA-2939
> URL: https://issues.apache.org/jira/browse/KAFKA-2939
> Project: Kafka
>  Issue Type: Improvement
>  Components: config
>Reporter: Guozhang Wang
>Assignee: Rens Groothuijsen
>Priority: Major
>  Labels: newbie
>
> Today we always log unused configs in KafkaProducer / KafkaConsumer in their 
> constructors, however for some cases like Kafka Streams that make use of 
> these clients, other configs may be passed in to configure Partitioner / 
> Serializer classes, etc. So it would be better to make this function call 
> optional to avoid printing unnecessary and confusing WARN entries.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5666) Need feedback to user if consumption fails due to offsets.topic.replication.factor=3

2019-05-05 Thread Rens Groothuijsen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16833459#comment-16833459
 ] 

Rens Groothuijsen commented on KAFKA-5666:
--

[~apurva] [~yevabyzek] Checking in the console consumer if the offset topic 
exists sounds like a fine idea, though I ran into a little issue while playing 
around with it. If automatic topic creation is enabled and the check is done 
before entering the main loop, it might wrongly throw an error because the 
topic does not exist yet. The topic could be initialized first by polling, but 
that gives me concerns that it could be altering the topic state when you only 
want to check the state. Any ideas?

> Need feedback to user if consumption fails due to 
> offsets.topic.replication.factor=3
> 
>
> Key: KAFKA-5666
> URL: https://issues.apache.org/jira/browse/KAFKA-5666
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 0.11.0.0
>Reporter: Yeva Byzek
>Priority: Major
>  Labels: newbie, usability
>
> Introduced in 0.11: The offsets.topic.replication.factor broker config is now 
> enforced upon auto topic creation. Internal auto topic creation will fail 
> with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets 
> this replication factor requirement.
> Issue: Default is setting offsets.topic.replication.factor=3, but in 
> development and docker environments where there is only 1 broker, the offsets 
> topic will fail to be created when a consumer tries to consume and no records 
> will be returned.  As a result, the user experience is bad.  The user may 
> have no idea about this setting change and enforcement, and they just see 
> that `kafka-console-consumer` hangs with ZERO output. It is true that the 
> broker log file will provide a message (e.g. {{ERROR [KafkaApi-1] Number of 
> alive brokers '1' does not meet the required replication factor '3' for the 
> offsets topic (configured via 'offsets.topic.replication.factor'). This error 
> can be ignored if the cluster is starting up and not all brokers are up yet. 
> (kafka.server.KafkaApis)}}) but many users do not have access to the log 
> files or know how to get them.
> Suggestion: give feedback to the user/app if offsets topic cannot be created. 
>  For example, after some timeout.
> Workaround:
> Set offsets.topic.replication.factor=3



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)


[jira] [Commented] (KAFKA-5666) Need feedback to user if consumption fails due to offsets.topic.replication.factor=3

2019-04-28 Thread Rens Groothuijsen (JIRA)


[ 
https://issues.apache.org/jira/browse/KAFKA-5666?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16828030#comment-16828030
 ] 

Rens Groothuijsen commented on KAFKA-5666:
--

Hi, I'm fairly new to Kafka. Can I pick up this ticket?

> Need feedback to user if consumption fails due to 
> offsets.topic.replication.factor=3
> 
>
> Key: KAFKA-5666
> URL: https://issues.apache.org/jira/browse/KAFKA-5666
> Project: Kafka
>  Issue Type: Improvement
>  Components: clients, consumer
>Affects Versions: 0.11.0.0
>Reporter: Yeva Byzek
>Priority: Major
>  Labels: newbie, usability
>
> Introduced in 0.11: The offsets.topic.replication.factor broker config is now 
> enforced upon auto topic creation. Internal auto topic creation will fail 
> with a GROUP_COORDINATOR_NOT_AVAILABLE error until the cluster size meets 
> this replication factor requirement.
> Issue: Default is setting offsets.topic.replication.factor=3, but in 
> development and docker environments where there is only 1 broker, the offsets 
> topic will fail to be created when a consumer tries to consume and no records 
> will be returned.  As a result, the user experience is bad.  The user may 
> have no idea about this setting change and enforcement, and they just see 
> that `kafka-console-consumer` hangs with ZERO output. It is true that the 
> broker log file will provide a message (e.g. {{ERROR [KafkaApi-1] Number of 
> alive brokers '1' does not meet the required replication factor '3' for the 
> offsets topic (configured via 'offsets.topic.replication.factor'). This error 
> can be ignored if the cluster is starting up and not all brokers are up yet. 
> (kafka.server.KafkaApis)}}) but many users do not have access to the log 
> files or know how to get them.
> Suggestion: give feedback to the user/app if offsets topic cannot be created. 
>  For example, after some timeout.
> Workaround:
> Set offsets.topic.replication.factor=3



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)