Re: [ANNOUNCE] New committer: John Roesler
Congratulations John! Well deserved and thanks for all your help Best regards Patrik > Am 13.11.2019 um 06:10 schrieb Kamal Chandraprakash > : > > Congrats John! > >> On Wed, Nov 13, 2019 at 7:57 AM Dong Lin wrote: >> >> Congratulations John! >> >>> On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang wrote: >>> >>> Hi Everyone, >>> >>> The PMC of Apache Kafka is pleased to announce a new Kafka committer, >> John >>> Roesler. >>> >>> John has been contributing to Apache Kafka since early 2018. His main >>> contributions are primarily around Kafka Streams, but have also included >>> improving our test coverage beyond Streams as well. Besides his own code >>> contributions, John has also actively participated on community >> discussions >>> and reviews including several other contributors' big proposals like >>> foreign-key join in Streams (KIP-213). He has also been writing, >> presenting >>> and evangelizing Apache Kafka in many venues. >>> >>> Congratulations, John! And look forward to more collaborations with you >> on >>> Apache Kafka. >>> >>> >>> Guozhang, on behalf of the Apache Kafka PMC >>> >>
best config for kafka 10.0.0.1 consumer.assign.
Hi, I m using consumer assign method and consume with 15000 poll time out to consume single partition data from another DC. Below are my consumer configs: enable.auto.commit=false max.poll.records=4000 max.partition.fetch.bytes=4096000 key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer value.deserializer=org.apache.kafka.common.serialization .ByteArrayDeserializer with this my consumer works fine. but when I'm changing max.partition.fetch.bytes to 16384000, my consumer is not receiving any message. there is no exception. if I'm using consumer assign, do I need to tune below properties: fetch.max.bytes session.timeout.ms heartbeat.interval.ms Please let me know if I'm missing something.
Re: [ANNOUNCE] New committer: John Roesler
Congrats John! On Wed, Nov 13, 2019 at 7:57 AM Dong Lin wrote: > Congratulations John! > > On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang wrote: > > > Hi Everyone, > > > > The PMC of Apache Kafka is pleased to announce a new Kafka committer, > John > > Roesler. > > > > John has been contributing to Apache Kafka since early 2018. His main > > contributions are primarily around Kafka Streams, but have also included > > improving our test coverage beyond Streams as well. Besides his own code > > contributions, John has also actively participated on community > discussions > > and reviews including several other contributors' big proposals like > > foreign-key join in Streams (KIP-213). He has also been writing, > presenting > > and evangelizing Apache Kafka in many venues. > > > > Congratulations, John! And look forward to more collaborations with you > on > > Apache Kafka. > > > > > > Guozhang, on behalf of the Apache Kafka PMC > > >
Re: [ANNOUNCE] New committer: John Roesler
Congratulations John! On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang wrote: > Hi Everyone, > > The PMC of Apache Kafka is pleased to announce a new Kafka committer, John > Roesler. > > John has been contributing to Apache Kafka since early 2018. His main > contributions are primarily around Kafka Streams, but have also included > improving our test coverage beyond Streams as well. Besides his own code > contributions, John has also actively participated on community discussions > and reviews including several other contributors' big proposals like > foreign-key join in Streams (KIP-213). He has also been writing, presenting > and evangelizing Apache Kafka in many venues. > > Congratulations, John! And look forward to more collaborations with you on > Apache Kafka. > > > Guozhang, on behalf of the Apache Kafka PMC >
Re: [ANNOUNCE] New committer: John Roesler
Congratulations John! Thanks, Mayuresh On Tue, Nov 12, 2019 at 4:54 PM Vahid Hashemian wrote: > Congratulations John! > > --Vahid > > On Tue, Nov 12, 2019 at 4:38 PM Adam Bellemare > wrote: > > > Congratulations John, and thanks for all your help on KIP-213! > > > > > On Nov 12, 2019, at 6:24 PM, Bill Bejeck wrote: > > > > > > Congratulations John! > > > > > > On Tue, Nov 12, 2019 at 6:20 PM Matthias J. Sax > > > > wrote: > > > > > >> Congrats John! > > >> > > >> > > >>> On 11/12/19 2:52 PM, Boyang Chen wrote: > > >>> Great work John! Well deserved > > >>> > > >>> On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang > > >> wrote: > > >>> > > Hi Everyone, > > > > The PMC of Apache Kafka is pleased to announce a new Kafka > committer, > > >> John > > Roesler. > > > > John has been contributing to Apache Kafka since early 2018. His > main > > contributions are primarily around Kafka Streams, but have also > > included > > improving our test coverage beyond Streams as well. Besides his own > > code > > contributions, John has also actively participated on community > > >> discussions > > and reviews including several other contributors' big proposals like > > foreign-key join in Streams (KIP-213). He has also been writing, > > >> presenting > > and evangelizing Apache Kafka in many venues. > > > > Congratulations, John! And look forward to more collaborations with > > you > > >> on > > Apache Kafka. > > > > > > Guozhang, on behalf of the Apache Kafka PMC > > > > >>> > > >> > > >> > > > > > -- > > Thanks! > --Vahid > -- -Regards, Mayuresh R. Gharat (862) 250-7125
Re: [ANNOUNCE] New committer: John Roesler
Congratulations John! --Vahid On Tue, Nov 12, 2019 at 4:38 PM Adam Bellemare wrote: > Congratulations John, and thanks for all your help on KIP-213! > > > On Nov 12, 2019, at 6:24 PM, Bill Bejeck wrote: > > > > Congratulations John! > > > > On Tue, Nov 12, 2019 at 6:20 PM Matthias J. Sax > > wrote: > > > >> Congrats John! > >> > >> > >>> On 11/12/19 2:52 PM, Boyang Chen wrote: > >>> Great work John! Well deserved > >>> > >>> On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang > >> wrote: > >>> > Hi Everyone, > > The PMC of Apache Kafka is pleased to announce a new Kafka committer, > >> John > Roesler. > > John has been contributing to Apache Kafka since early 2018. His main > contributions are primarily around Kafka Streams, but have also > included > improving our test coverage beyond Streams as well. Besides his own > code > contributions, John has also actively participated on community > >> discussions > and reviews including several other contributors' big proposals like > foreign-key join in Streams (KIP-213). He has also been writing, > >> presenting > and evangelizing Apache Kafka in many venues. > > Congratulations, John! And look forward to more collaborations with > you > >> on > Apache Kafka. > > > Guozhang, on behalf of the Apache Kafka PMC > > >>> > >> > >> > -- Thanks! --Vahid
Re: [ANNOUNCE] New committer: John Roesler
Congratulations John, and thanks for all your help on KIP-213! > On Nov 12, 2019, at 6:24 PM, Bill Bejeck wrote: > > Congratulations John! > > On Tue, Nov 12, 2019 at 6:20 PM Matthias J. Sax > wrote: > >> Congrats John! >> >> >>> On 11/12/19 2:52 PM, Boyang Chen wrote: >>> Great work John! Well deserved >>> >>> On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang >> wrote: >>> Hi Everyone, The PMC of Apache Kafka is pleased to announce a new Kafka committer, >> John Roesler. John has been contributing to Apache Kafka since early 2018. His main contributions are primarily around Kafka Streams, but have also included improving our test coverage beyond Streams as well. Besides his own code contributions, John has also actively participated on community >> discussions and reviews including several other contributors' big proposals like foreign-key join in Streams (KIP-213). He has also been writing, >> presenting and evangelizing Apache Kafka in many venues. Congratulations, John! And look forward to more collaborations with you >> on Apache Kafka. Guozhang, on behalf of the Apache Kafka PMC >>> >> >>
Re: [ANNOUNCE] New committer: John Roesler
Congratulations John! On Tue, Nov 12, 2019 at 6:20 PM Matthias J. Sax wrote: > Congrats John! > > > On 11/12/19 2:52 PM, Boyang Chen wrote: > > Great work John! Well deserved > > > > On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang > wrote: > > > >> Hi Everyone, > >> > >> The PMC of Apache Kafka is pleased to announce a new Kafka committer, > John > >> Roesler. > >> > >> John has been contributing to Apache Kafka since early 2018. His main > >> contributions are primarily around Kafka Streams, but have also included > >> improving our test coverage beyond Streams as well. Besides his own code > >> contributions, John has also actively participated on community > discussions > >> and reviews including several other contributors' big proposals like > >> foreign-key join in Streams (KIP-213). He has also been writing, > presenting > >> and evangelizing Apache Kafka in many venues. > >> > >> Congratulations, John! And look forward to more collaborations with you > on > >> Apache Kafka. > >> > >> > >> Guozhang, on behalf of the Apache Kafka PMC > >> > > > >
Re: [ANNOUNCE] New committer: John Roesler
Congrats John! On 11/12/19 2:52 PM, Boyang Chen wrote: > Great work John! Well deserved > > On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang wrote: > >> Hi Everyone, >> >> The PMC of Apache Kafka is pleased to announce a new Kafka committer, John >> Roesler. >> >> John has been contributing to Apache Kafka since early 2018. His main >> contributions are primarily around Kafka Streams, but have also included >> improving our test coverage beyond Streams as well. Besides his own code >> contributions, John has also actively participated on community discussions >> and reviews including several other contributors' big proposals like >> foreign-key join in Streams (KIP-213). He has also been writing, presenting >> and evangelizing Apache Kafka in many venues. >> >> Congratulations, John! And look forward to more collaborations with you on >> Apache Kafka. >> >> >> Guozhang, on behalf of the Apache Kafka PMC >> > signature.asc Description: OpenPGP digital signature
Re: [ANNOUNCE] New committer: John Roesler
Great work John! Well deserved On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang wrote: > Hi Everyone, > > The PMC of Apache Kafka is pleased to announce a new Kafka committer, John > Roesler. > > John has been contributing to Apache Kafka since early 2018. His main > contributions are primarily around Kafka Streams, but have also included > improving our test coverage beyond Streams as well. Besides his own code > contributions, John has also actively participated on community discussions > and reviews including several other contributors' big proposals like > foreign-key join in Streams (KIP-213). He has also been writing, presenting > and evangelizing Apache Kafka in many venues. > > Congratulations, John! And look forward to more collaborations with you on > Apache Kafka. > > > Guozhang, on behalf of the Apache Kafka PMC >
[ANNOUNCE] New committer: John Roesler
Hi Everyone, The PMC of Apache Kafka is pleased to announce a new Kafka committer, John Roesler. John has been contributing to Apache Kafka since early 2018. His main contributions are primarily around Kafka Streams, but have also included improving our test coverage beyond Streams as well. Besides his own code contributions, John has also actively participated on community discussions and reviews including several other contributors' big proposals like foreign-key join in Streams (KIP-213). He has also been writing, presenting and evangelizing Apache Kafka in many venues. Congratulations, John! And look forward to more collaborations with you on Apache Kafka. Guozhang, on behalf of the Apache Kafka PMC
Leveraging DLQ for errors coming from a sink connector plugin
Hi, Looking at the Kafka Connect code, it seems that the built-in support for DLQ queues only works for errors related to transformations and converters (headers, key, and value). I wonder if it has been considered (and maybe discarded) to use the same mechanism for the call to the connector-plugin.put() operation. That way, it would be possible for connector-plugins to leverage the same DLQ semantics that Connect already implements without "reinventing the wheel" themselves. We have found ourselves in that situation with one in-house connector plugin that we are building (HTTP Connector with extra specific bits). When the connector plugin has tried X times to do its HTTP call, we want to "offload" the record into a DLQ queue. Any chances that this will be implemented? Thanks. Regards, Javier.
Re: Install kafka-connect-storage-cloud
Hi Miguel! If you're using Kafka Connect in standalone mode then you need to pass it a .properties (key=value) file, not JSON. JSON is if you are using Kafka Connect in distributed mode (which personally I advocate, even on a single node), if you use that mode then you pass the JSON to the REST API after starting the worker. See https://rmoff.dev/berlin19-kafka-connect for examples and discussion -- Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff On Tue, 12 Nov 2019 at 16:40, Miguel Silvestre wrote: > Hi, > > I'm new to kafka (really newbie) and I'm trying to set this connector on my > local machine which is a macOS Mojava 10.14.6. > > I've downloaded the connector and put the contents on folder: > /usr/local/share/kafka/plugins > and update the plugin.path on file > /usr/local/etc/kafka/connect-standalone.properties to: > /usr/local/share/kafka/plugins > > I'm launching the connector like this: > /usr/local/Cellar/kafka/2.3.1/bin/connect-standalone > /usr/local/etc/kafka/connect-standalone.properties > /Users/miguel.silvestre/meetups-to-s3.json > > However I'm always getting the error bellow. > Any idea on what am I doing wrong? > > Thank you > Miguel Silvestre > > PS. I need a sink connector that reads json from kafka topics and writes to > s3 on parquet files. I need to read several topics and the files are going > to the same bucket on different paths. Do you anything that can do the > task? It seems that secor is having building issues right now. > > > > [2019-11-12 16:24:19,322] INFO Kafka Connect started > (org.apache.kafka.connect.runtime.Connect:56) > [2019-11-12 16:24:19,325] ERROR Failed to create job for > /Users/miguel.silvestre/meetups-to-s3.json > (org.apache.kafka.connect.cli.ConnectStandalone:110) > [2019-11-12 16:24:19,326] ERROR Stopping after connector error > (org.apache.kafka.connect.cli.ConnectStandalone:121) > java.util.concurrent.ExecutionException: > org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector > config {"s3.part.size"="5242880",, "partition.duration.ms"="360",, " > s3.bucket.name"="test-connector",, > "value.converter.schemas.enable"="false",, "timezone"="UTC",, },=, > > "partitioner.class"="io.confluent.connect.storage.partitioner.TimeBasedPartitioner",, > "path.format"="'date'=-MM-dd/'hour'=HH",, "rotate.interval.ms > "="6",, > "name"="meetups-to-s3",, "flush.size"="10",, > "key.converter.schemas.enable"="false",, > "value.converter"="org.apache.kafka.connect.json.JsonConverter",, > "topics"="test",, "tasks"=[], "config"={, > "connector.class"="io.confluent.connect.s3.S3SinkConnector",, > "format.class"="io.confluent.connect.s3.format.json.JsonFormat",, > "tasks.max"="1",, "s3.region"="eu-west-1",, > "key.converter"="org.apache.kafka.connect.json.JsonConverter",, > "timestamp.extractor"="Record", "locale"="en",, > "schema.compatibility"="NONE",, {=, > "storage.class"="io.confluent.connect.s3.storage.S3Storage",, }=} contains > no connector type > at > > org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79) > at > > org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66) > at > > org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118) > Caused by: > org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector > config {"s3.part.size"="5242880",, "partition.duration.ms"="360",, " > s3.bucket.name"="test-connector",, > "value.converter.schemas.enable"="false",, "timezone"="UTC",, },=, > > "partitioner.class"="io.confluent.connect.storage.partitioner.TimeBasedPartitioner",, > "path.format"="'date'=-MM-dd/'hour'=HH",, "rotate.interval.ms > "="6",, > "name"="meetups-to-s3",, "flush.size"="10",, > "key.converter.schemas.enable"="false",, > "value.converter"="org.apache.kafka.connect.json.JsonConverter",, > "topics"="test",, "tasks"=[], "config"={, > "connector.class"="io.confluent.connect.s3.S3SinkConnector",, > "format.class"="io.confluent.connect.s3.format.json.JsonFormat",, > "tasks.max"="1",, "s3.region"="eu-west-1",, > "key.converter"="org.apache.kafka.connect.json.JsonConverter",, > "timestamp.extractor"="Record", "locale"="en",, > "schema.compatibility"="NONE",, {=, > "storage.class"="io.confluent.connect.s3.storage.S3Storage",, }=} contains > no connector type > at > > org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:287) > at > > org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:192) > at > > org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115) > [2019-11-12 16:24:19,329] INFO Kafka Connect stopping > (org.apache.kafka.connect.runtime.Connect:66) > -- > Miguel Silvestre >
Install kafka-connect-storage-cloud
Hi, I'm new to kafka (really newbie) and I'm trying to set this connector on my local machine which is a macOS Mojava 10.14.6. I've downloaded the connector and put the contents on folder: /usr/local/share/kafka/plugins and update the plugin.path on file /usr/local/etc/kafka/connect-standalone.properties to: /usr/local/share/kafka/plugins I'm launching the connector like this: /usr/local/Cellar/kafka/2.3.1/bin/connect-standalone /usr/local/etc/kafka/connect-standalone.properties /Users/miguel.silvestre/meetups-to-s3.json However I'm always getting the error bellow. Any idea on what am I doing wrong? Thank you Miguel Silvestre PS. I need a sink connector that reads json from kafka topics and writes to s3 on parquet files. I need to read several topics and the files are going to the same bucket on different paths. Do you anything that can do the task? It seems that secor is having building issues right now. [2019-11-12 16:24:19,322] INFO Kafka Connect started (org.apache.kafka.connect.runtime.Connect:56) [2019-11-12 16:24:19,325] ERROR Failed to create job for /Users/miguel.silvestre/meetups-to-s3.json (org.apache.kafka.connect.cli.ConnectStandalone:110) [2019-11-12 16:24:19,326] ERROR Stopping after connector error (org.apache.kafka.connect.cli.ConnectStandalone:121) java.util.concurrent.ExecutionException: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector config {"s3.part.size"="5242880",, "partition.duration.ms"="360",, " s3.bucket.name"="test-connector",, "value.converter.schemas.enable"="false",, "timezone"="UTC",, },=, "partitioner.class"="io.confluent.connect.storage.partitioner.TimeBasedPartitioner",, "path.format"="'date'=-MM-dd/'hour'=HH",, "rotate.interval.ms"="6",, "name"="meetups-to-s3",, "flush.size"="10",, "key.converter.schemas.enable"="false",, "value.converter"="org.apache.kafka.connect.json.JsonConverter",, "topics"="test",, "tasks"=[], "config"={, "connector.class"="io.confluent.connect.s3.S3SinkConnector",, "format.class"="io.confluent.connect.s3.format.json.JsonFormat",, "tasks.max"="1",, "s3.region"="eu-west-1",, "key.converter"="org.apache.kafka.connect.json.JsonConverter",, "timestamp.extractor"="Record", "locale"="en",, "schema.compatibility"="NONE",, {=, "storage.class"="io.confluent.connect.s3.storage.S3Storage",, }=} contains no connector type at org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79) at org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118) Caused by: org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector config {"s3.part.size"="5242880",, "partition.duration.ms"="360",, " s3.bucket.name"="test-connector",, "value.converter.schemas.enable"="false",, "timezone"="UTC",, },=, "partitioner.class"="io.confluent.connect.storage.partitioner.TimeBasedPartitioner",, "path.format"="'date'=-MM-dd/'hour'=HH",, "rotate.interval.ms"="6",, "name"="meetups-to-s3",, "flush.size"="10",, "key.converter.schemas.enable"="false",, "value.converter"="org.apache.kafka.connect.json.JsonConverter",, "topics"="test",, "tasks"=[], "config"={, "connector.class"="io.confluent.connect.s3.S3SinkConnector",, "format.class"="io.confluent.connect.s3.format.json.JsonFormat",, "tasks.max"="1",, "s3.region"="eu-west-1",, "key.converter"="org.apache.kafka.connect.json.JsonConverter",, "timestamp.extractor"="Record", "locale"="en",, "schema.compatibility"="NONE",, {=, "storage.class"="io.confluent.connect.s3.storage.S3Storage",, }=} contains no connector type at org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:287) at org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:192) at org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115) [2019-11-12 16:24:19,329] INFO Kafka Connect stopping (org.apache.kafka.connect.runtime.Connect:66) -- Miguel Silvestre
Re: MirrorMaker 2 Plugin class loader Error
+1 On Mon, Nov 11, 2019 at 2:07 PM Ryanne Dolan wrote: > Rajeev, the config errors are unavoidable at present and can be ignored or > silenced. The Plugin error is concerning, and was previously described by > Vishal. I suppose it's possible there is a dependency conflict in these > builds. Can you send me the hash that you're building from? I'll try to > reproduce. > > Ryanne > > On Fri, Nov 8, 2019, 7:31 PM Rajeev Chakrabarti > wrote: > >> Hi Folks, >> I'm trying to run MM 2 with the current trunk. Also, tried with the 2.4 >> branch and I'm getting "ERROR Plugin class loader for connector: >> 'org.apache.kafka.connect.mirror.MirrorSourceConnector'" errors for all the >> connectors. It does not seem to be creating topics in the destination >> cluster but has created the internal topics at both the source and >> destination and has populated the heartbeats topic. But none of the source >> topics created or replicated. I'm also getting a bunch of "not known >> configs" like 'consumer.group.id' was supplied but isn't a known config. >> What am I doing wrong? >> Regards,Rajeev >> >
Re: kafka-console-consumer --value-deserializer with access to headers
HI, On Tue, 12 Nov 2019 at 14:37, Jorg Heymans wrote: > Thanks for helping debugging this. You can reproduce the issue using below > deserializer, and invoking kafka-console-consumer with > --value-deserializer=my.BasicDeserializer . As you will see, when the > consumer starts receiving messages only "SERDE WITHOUT HEADERS" is printed > to the console. > > Thanks, > Jorg > In the above, what command have you put exactly from command prompt ? can you share this with us? Thanks, > > public class BasicDeserializer implements Deserializer { > > @Override > public void configure(Map configs, boolean isKey) { > System.out.println("CONFIGURE"); > } > > @Override > public String deserialize(String topic, byte[] data) { > System.out.println("SERDE WITHOUT HEADERS"); > return new String(data); > } > > @Override > public String deserialize(String topic, Headers headers, byte[] data) { > System.out.println("SERDE WITH HEADERS"); > return new String(data); > } > > @Override > public void close() { > System.out.println("CLOSE"); > } > } > > > > > On 2019/11/12 12:57:21, "M. Manna" wrote: > > HI again, > > > > On Tue, 12 Nov 2019 at 12:31, Jorg Heymans > wrote: > > > > > Hi, > > > > > > The issue is not that i cannot get a custom deserializer working, it's > > > that the custom deserializer i provide implements the default method > from > > > the Deserializer interface > > > > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59 > > > that gives access to record Headers. > > > > > > The kafka console consumer never calls this method, it will only call > the > > > variant without Headers > > > > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50 > > > > > > I'm using kafka 2.3.0 btw. > > > > > > Jorg > > > > > > > Recrord feching (deserialization call) happens using Fetcher. And Fetcher > > is calling default implementation of Deserializer.deserialize() with > > header. The default implementation returns the implementation of > > deserialize() with header. If you provide overridden version of > > deserializer (for both header/non-header) it will be called. > > > > > https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265 > > > > > https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268 > > > > Console consumer simply puts a consumer wrapper around KafkaConsumer. > There > > is no change in behaviour otherwise. I take it that you've debugged and > > confirmed that it's not calling your overridden deserialize() with > headers? > > If so, can you link it here for everyone's benefit? > > > > Thanks, > > > > > > > > > > > > > On 2019/11/12 11:58:26, "M. Manna" wrote: > > > > > > > > I think you can try the following to get your implementation working > > > > > > > > 1) Provide the SerDe classes into classpath > > > > 2) Provide your consumer config file > > > > 3) Provide key/value Deserializer props via --consumer-property arg. > > > > > > > > > > > > >
Re: kafka-console-consumer --value-deserializer with access to headers
Thanks for helping debugging this. You can reproduce the issue using below deserializer, and invoking kafka-console-consumer with --value-deserializer=my.BasicDeserializer . As you will see, when the consumer starts receiving messages only "SERDE WITHOUT HEADERS" is printed to the console. Thanks, Jorg public class BasicDeserializer implements Deserializer { @Override public void configure(Map configs, boolean isKey) { System.out.println("CONFIGURE"); } @Override public String deserialize(String topic, byte[] data) { System.out.println("SERDE WITHOUT HEADERS"); return new String(data); } @Override public String deserialize(String topic, Headers headers, byte[] data) { System.out.println("SERDE WITH HEADERS"); return new String(data); } @Override public void close() { System.out.println("CLOSE"); } } On 2019/11/12 12:57:21, "M. Manna" wrote: > HI again, > > On Tue, 12 Nov 2019 at 12:31, Jorg Heymans wrote: > > > Hi, > > > > The issue is not that i cannot get a custom deserializer working, it's > > that the custom deserializer i provide implements the default method from > > the Deserializer interface > > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59 > > that gives access to record Headers. > > > > The kafka console consumer never calls this method, it will only call the > > variant without Headers > > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50 > > > > I'm using kafka 2.3.0 btw. > > > > Jorg > > > > Recrord feching (deserialization call) happens using Fetcher. And Fetcher > is calling default implementation of Deserializer.deserialize() with > header. The default implementation returns the implementation of > deserialize() with header. If you provide overridden version of > deserializer (for both header/non-header) it will be called. > > https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265 > > https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268 > > Console consumer simply puts a consumer wrapper around KafkaConsumer. There > is no change in behaviour otherwise. I take it that you've debugged and > confirmed that it's not calling your overridden deserialize() with headers? > If so, can you link it here for everyone's benefit? > > Thanks, > > > > > > > On 2019/11/12 11:58:26, "M. Manna" wrote: > > > > > > I think you can try the following to get your implementation working > > > > > > 1) Provide the SerDe classes into classpath > > > 2) Provide your consumer config file > > > 3) Provide key/value Deserializer props via --consumer-property arg. > > > > > > > >
Re: kafka-console-consumer --value-deserializer with access to headers
HI again, On Tue, 12 Nov 2019 at 12:31, Jorg Heymans wrote: > Hi, > > The issue is not that i cannot get a custom deserializer working, it's > that the custom deserializer i provide implements the default method from > the Deserializer interface > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59 > that gives access to record Headers. > > The kafka console consumer never calls this method, it will only call the > variant without Headers > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50 > > I'm using kafka 2.3.0 btw. > > Jorg > Recrord feching (deserialization call) happens using Fetcher. And Fetcher is calling default implementation of Deserializer.deserialize() with header. The default implementation returns the implementation of deserialize() with header. If you provide overridden version of deserializer (for both header/non-header) it will be called. https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265 https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268 Console consumer simply puts a consumer wrapper around KafkaConsumer. There is no change in behaviour otherwise. I take it that you've debugged and confirmed that it's not calling your overridden deserialize() with headers? If so, can you link it here for everyone's benefit? Thanks, > On 2019/11/12 11:58:26, "M. Manna" wrote: > > > > I think you can try the following to get your implementation working > > > > 1) Provide the SerDe classes into classpath > > 2) Provide your consumer config file > > 3) Provide key/value Deserializer props via --consumer-property arg. > > > >
Re: kafka-console-consumer --value-deserializer with access to headers
Hi, The issue is not that i cannot get a custom deserializer working, it's that the custom deserializer i provide implements the default method from the Deserializer interface https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59 that gives access to record Headers. The kafka console consumer never calls this method, it will only call the variant without Headers https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50 I'm using kafka 2.3.0 btw. Jorg On 2019/11/12 11:58:26, "M. Manna" wrote: > > I think you can try the following to get your implementation working > > 1) Provide the SerDe classes into classpath > 2) Provide your consumer config file > 3) Provide key/value Deserializer props via --consumer-property arg. >
Re: kafka-console-consumer --value-deserializer with access to headers
Hi On Tue, 12 Nov 2019 at 09:53, Jorg Heymans wrote: > Indeed, i corrected the typo but now my deserializer class is not taken > into account at all and it goes back to the default deserializer. You can > verify this by putting a non-existent class and it still runs fine. > > value.deserializer=does.not.exist > > In ConsoleConsumer, the bootstrap.server, key/value deserializer are being enforced via --consumer-property arg. It's aggregating all properties between --consumer-property and --consumer.config. It'll prioritise kv pair supplied via --consumer-property over the prop file. I think you can try the following to get your implementation working 1) Provide the SerDe classes into classpath 2) Provide your consumer config file 3) Provide key/value Deserializer props via --consumer-property arg. See how that works for you. Thanks, > Jorg > > On 2019/11/11 14:31:49, "M. Manna" wrote: > > You have a typo - you mean deserializer > > > > Please try again. > > > > Regards, > > > > On Mon, 11 Nov 2019 at 14:28, Jorg Heymans > wrote: > > > > > Don't think that option is available there, specifying > > > 'value.deserializer' in my consumer-config.properties file gives > > > > > > [2019-11-11 15:16:26,589] WARN The configuration 'value.serializer' was > > > supplied but isn't a known config. > > > (org.apache.kafka.clients.consumer.ConsumerConfig) > > > > > > Does there exist a description of what properties the consumer-config > > > properties file accepts ? I could find only a few references to it in > the > > > documentation. > > > > > > Jorg > > > > > > On 2019/11/11 13:00:03, "M. Manna" wrote: > > > > Hi, > > > > > > > > > > > > On Mon, 11 Nov 2019 at 10:58, Jorg Heymans > > > wrote: > > > > > > > > > Hi, > > > > > > > > > > I have created a class implementing Deserializer, providing an > > > > > implementation for > > > > > > > > > > public String deserialize(String topic, Headers headers, byte[] > data) > > > > > > > > > > that does some conditional processing based on headers, and then > calls > > > the > > > > > other serde method > > > > > > > > > > public String deserialize(String topic, byte[] data) > > > > > > > > > > What i'm seeing is that kafka-console-consumer only uses the second > > > method > > > > > when a value deserializer is specified. Is there a way to force it > to > > > > > invoke the first method, so i can do processing with headers ? I > tried > > > > > implementing the deprecated 'ExtendedSerializer' but it does not > make a > > > > > difference. > > > > > > > > > > Thanks, > > > > > Jorg > > > > > > > > > > > > > Have you tried providing a separate prop file using consumer.config > > > > argument? Please see the reference here: > > > > > > > > --consumer.config Consumer config properties > file. > > > > Note > > > >that [consumer-property] > takes > > > >precedence over this > config. > > > > > > > > Try that and see how it goes. > > > > > > > > Thanks, > > > > > > > > > >
Re: kafka-console-consumer --value-deserializer with access to headers
Indeed, i corrected the typo but now my deserializer class is not taken into account at all and it goes back to the default deserializer. You can verify this by putting a non-existent class and it still runs fine. value.deserializer=does.not.exist Jorg On 2019/11/11 14:31:49, "M. Manna" wrote: > You have a typo - you mean deserializer > > Please try again. > > Regards, > > On Mon, 11 Nov 2019 at 14:28, Jorg Heymans wrote: > > > Don't think that option is available there, specifying > > 'value.deserializer' in my consumer-config.properties file gives > > > > [2019-11-11 15:16:26,589] WARN The configuration 'value.serializer' was > > supplied but isn't a known config. > > (org.apache.kafka.clients.consumer.ConsumerConfig) > > > > Does there exist a description of what properties the consumer-config > > properties file accepts ? I could find only a few references to it in the > > documentation. > > > > Jorg > > > > On 2019/11/11 13:00:03, "M. Manna" wrote: > > > Hi, > > > > > > > > > On Mon, 11 Nov 2019 at 10:58, Jorg Heymans > > wrote: > > > > > > > Hi, > > > > > > > > I have created a class implementing Deserializer, providing an > > > > implementation for > > > > > > > > public String deserialize(String topic, Headers headers, byte[] data) > > > > > > > > that does some conditional processing based on headers, and then calls > > the > > > > other serde method > > > > > > > > public String deserialize(String topic, byte[] data) > > > > > > > > What i'm seeing is that kafka-console-consumer only uses the second > > method > > > > when a value deserializer is specified. Is there a way to force it to > > > > invoke the first method, so i can do processing with headers ? I tried > > > > implementing the deprecated 'ExtendedSerializer' but it does not make a > > > > difference. > > > > > > > > Thanks, > > > > Jorg > > > > > > > > > > Have you tried providing a separate prop file using consumer.config > > > argument? Please see the reference here: > > > > > > --consumer.config Consumer config properties file. > > > Note > > >that [consumer-property] takes > > >precedence over this config. > > > > > > Try that and see how it goes. > > > > > > Thanks, > > > > > >