Re: [ANNOUNCE] New committer: John Roesler

2019-11-12 Thread Patrik Kleindl
Congratulations John!
Well deserved and thanks for all your help
Best regards 
Patrik

> Am 13.11.2019 um 06:10 schrieb Kamal Chandraprakash 
> :
> 
> Congrats John!
> 
>> On Wed, Nov 13, 2019 at 7:57 AM Dong Lin  wrote:
>> 
>> Congratulations John!
>> 
>>> On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang  wrote:
>>> 
>>> Hi Everyone,
>>> 
>>> The PMC of Apache Kafka is pleased to announce a new Kafka committer,
>> John
>>> Roesler.
>>> 
>>> John has been contributing to Apache Kafka since early 2018. His main
>>> contributions are primarily around Kafka Streams, but have also included
>>> improving our test coverage beyond Streams as well. Besides his own code
>>> contributions, John has also actively participated on community
>> discussions
>>> and reviews including several other contributors' big proposals like
>>> foreign-key join in Streams (KIP-213). He has also been writing,
>> presenting
>>> and evangelizing Apache Kafka in many venues.
>>> 
>>> Congratulations, John! And look forward to more collaborations with you
>> on
>>> Apache Kafka.
>>> 
>>> 
>>> Guozhang, on behalf of the Apache Kafka PMC
>>> 
>> 


best config for kafka 10.0.0.1 consumer.assign.

2019-11-12 Thread Upendra Yadav
Hi,

I m using consumer assign method and consume with 15000 poll time out to
consume single partition data from another DC.

Below are my consumer configs:
enable.auto.commit=false
max.poll.records=4000
max.partition.fetch.bytes=4096000
key.deserializer=org.apache.kafka.common.serialization.ByteArrayDeserializer
value.deserializer=org.apache.kafka.common.serialization
.ByteArrayDeserializer

with this my consumer works fine. but when I'm changing
max.partition.fetch.bytes to 16384000, my consumer is not receiving any
message.
there is no exception. if I'm using consumer assign, do I need to tune
below properties:
fetch.max.bytes
session.timeout.ms
heartbeat.interval.ms
Please let me know if I'm missing something.


Re: [ANNOUNCE] New committer: John Roesler

2019-11-12 Thread Kamal Chandraprakash
Congrats John!

On Wed, Nov 13, 2019 at 7:57 AM Dong Lin  wrote:

> Congratulations John!
>
> On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang  wrote:
>
> > Hi Everyone,
> >
> > The PMC of Apache Kafka is pleased to announce a new Kafka committer,
> John
> > Roesler.
> >
> > John has been contributing to Apache Kafka since early 2018. His main
> > contributions are primarily around Kafka Streams, but have also included
> > improving our test coverage beyond Streams as well. Besides his own code
> > contributions, John has also actively participated on community
> discussions
> > and reviews including several other contributors' big proposals like
> > foreign-key join in Streams (KIP-213). He has also been writing,
> presenting
> > and evangelizing Apache Kafka in many venues.
> >
> > Congratulations, John! And look forward to more collaborations with you
> on
> > Apache Kafka.
> >
> >
> > Guozhang, on behalf of the Apache Kafka PMC
> >
>


Re: [ANNOUNCE] New committer: John Roesler

2019-11-12 Thread Dong Lin
Congratulations John!

On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang  wrote:

> Hi Everyone,
>
> The PMC of Apache Kafka is pleased to announce a new Kafka committer, John
> Roesler.
>
> John has been contributing to Apache Kafka since early 2018. His main
> contributions are primarily around Kafka Streams, but have also included
> improving our test coverage beyond Streams as well. Besides his own code
> contributions, John has also actively participated on community discussions
> and reviews including several other contributors' big proposals like
> foreign-key join in Streams (KIP-213). He has also been writing, presenting
> and evangelizing Apache Kafka in many venues.
>
> Congratulations, John! And look forward to more collaborations with you on
> Apache Kafka.
>
>
> Guozhang, on behalf of the Apache Kafka PMC
>


Re: [ANNOUNCE] New committer: John Roesler

2019-11-12 Thread Mayuresh Gharat
Congratulations John!

Thanks,

Mayuresh

On Tue, Nov 12, 2019 at 4:54 PM Vahid Hashemian 
wrote:

> Congratulations John!
>
> --Vahid
>
> On Tue, Nov 12, 2019 at 4:38 PM Adam Bellemare 
> wrote:
>
> > Congratulations John, and thanks for all your help on KIP-213!
> >
> > > On Nov 12, 2019, at 6:24 PM, Bill Bejeck  wrote:
> > >
> > > Congratulations John!
> > >
> > > On Tue, Nov 12, 2019 at 6:20 PM Matthias J. Sax  >
> > > wrote:
> > >
> > >> Congrats John!
> > >>
> > >>
> > >>> On 11/12/19 2:52 PM, Boyang Chen wrote:
> > >>> Great work John! Well deserved
> > >>>
> > >>> On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang 
> > >> wrote:
> > >>>
> >  Hi Everyone,
> > 
> >  The PMC of Apache Kafka is pleased to announce a new Kafka
> committer,
> > >> John
> >  Roesler.
> > 
> >  John has been contributing to Apache Kafka since early 2018. His
> main
> >  contributions are primarily around Kafka Streams, but have also
> > included
> >  improving our test coverage beyond Streams as well. Besides his own
> > code
> >  contributions, John has also actively participated on community
> > >> discussions
> >  and reviews including several other contributors' big proposals like
> >  foreign-key join in Streams (KIP-213). He has also been writing,
> > >> presenting
> >  and evangelizing Apache Kafka in many venues.
> > 
> >  Congratulations, John! And look forward to more collaborations with
> > you
> > >> on
> >  Apache Kafka.
> > 
> > 
> >  Guozhang, on behalf of the Apache Kafka PMC
> > 
> > >>>
> > >>
> > >>
> >
>
>
> --
>
> Thanks!
> --Vahid
>


-- 
-Regards,
Mayuresh R. Gharat
(862) 250-7125


Re: [ANNOUNCE] New committer: John Roesler

2019-11-12 Thread Vahid Hashemian
Congratulations John!

--Vahid

On Tue, Nov 12, 2019 at 4:38 PM Adam Bellemare 
wrote:

> Congratulations John, and thanks for all your help on KIP-213!
>
> > On Nov 12, 2019, at 6:24 PM, Bill Bejeck  wrote:
> >
> > Congratulations John!
> >
> > On Tue, Nov 12, 2019 at 6:20 PM Matthias J. Sax 
> > wrote:
> >
> >> Congrats John!
> >>
> >>
> >>> On 11/12/19 2:52 PM, Boyang Chen wrote:
> >>> Great work John! Well deserved
> >>>
> >>> On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang 
> >> wrote:
> >>>
>  Hi Everyone,
> 
>  The PMC of Apache Kafka is pleased to announce a new Kafka committer,
> >> John
>  Roesler.
> 
>  John has been contributing to Apache Kafka since early 2018. His main
>  contributions are primarily around Kafka Streams, but have also
> included
>  improving our test coverage beyond Streams as well. Besides his own
> code
>  contributions, John has also actively participated on community
> >> discussions
>  and reviews including several other contributors' big proposals like
>  foreign-key join in Streams (KIP-213). He has also been writing,
> >> presenting
>  and evangelizing Apache Kafka in many venues.
> 
>  Congratulations, John! And look forward to more collaborations with
> you
> >> on
>  Apache Kafka.
> 
> 
>  Guozhang, on behalf of the Apache Kafka PMC
> 
> >>>
> >>
> >>
>


-- 

Thanks!
--Vahid


Re: [ANNOUNCE] New committer: John Roesler

2019-11-12 Thread Adam Bellemare
Congratulations John, and thanks for all your help on KIP-213!

> On Nov 12, 2019, at 6:24 PM, Bill Bejeck  wrote:
> 
> Congratulations John!
> 
> On Tue, Nov 12, 2019 at 6:20 PM Matthias J. Sax 
> wrote:
> 
>> Congrats John!
>> 
>> 
>>> On 11/12/19 2:52 PM, Boyang Chen wrote:
>>> Great work John! Well deserved
>>> 
>>> On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang 
>> wrote:
>>> 
 Hi Everyone,
 
 The PMC of Apache Kafka is pleased to announce a new Kafka committer,
>> John
 Roesler.
 
 John has been contributing to Apache Kafka since early 2018. His main
 contributions are primarily around Kafka Streams, but have also included
 improving our test coverage beyond Streams as well. Besides his own code
 contributions, John has also actively participated on community
>> discussions
 and reviews including several other contributors' big proposals like
 foreign-key join in Streams (KIP-213). He has also been writing,
>> presenting
 and evangelizing Apache Kafka in many venues.
 
 Congratulations, John! And look forward to more collaborations with you
>> on
 Apache Kafka.
 
 
 Guozhang, on behalf of the Apache Kafka PMC
 
>>> 
>> 
>> 


Re: [ANNOUNCE] New committer: John Roesler

2019-11-12 Thread Bill Bejeck
Congratulations John!

On Tue, Nov 12, 2019 at 6:20 PM Matthias J. Sax 
wrote:

> Congrats John!
>
>
> On 11/12/19 2:52 PM, Boyang Chen wrote:
> > Great work John! Well deserved
> >
> > On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang 
> wrote:
> >
> >> Hi Everyone,
> >>
> >> The PMC of Apache Kafka is pleased to announce a new Kafka committer,
> John
> >> Roesler.
> >>
> >> John has been contributing to Apache Kafka since early 2018. His main
> >> contributions are primarily around Kafka Streams, but have also included
> >> improving our test coverage beyond Streams as well. Besides his own code
> >> contributions, John has also actively participated on community
> discussions
> >> and reviews including several other contributors' big proposals like
> >> foreign-key join in Streams (KIP-213). He has also been writing,
> presenting
> >> and evangelizing Apache Kafka in many venues.
> >>
> >> Congratulations, John! And look forward to more collaborations with you
> on
> >> Apache Kafka.
> >>
> >>
> >> Guozhang, on behalf of the Apache Kafka PMC
> >>
> >
>
>


Re: [ANNOUNCE] New committer: John Roesler

2019-11-12 Thread Matthias J. Sax
Congrats John!


On 11/12/19 2:52 PM, Boyang Chen wrote:
> Great work John! Well deserved
> 
> On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang  wrote:
> 
>> Hi Everyone,
>>
>> The PMC of Apache Kafka is pleased to announce a new Kafka committer, John
>> Roesler.
>>
>> John has been contributing to Apache Kafka since early 2018. His main
>> contributions are primarily around Kafka Streams, but have also included
>> improving our test coverage beyond Streams as well. Besides his own code
>> contributions, John has also actively participated on community discussions
>> and reviews including several other contributors' big proposals like
>> foreign-key join in Streams (KIP-213). He has also been writing, presenting
>> and evangelizing Apache Kafka in many venues.
>>
>> Congratulations, John! And look forward to more collaborations with you on
>> Apache Kafka.
>>
>>
>> Guozhang, on behalf of the Apache Kafka PMC
>>
> 



signature.asc
Description: OpenPGP digital signature


Re: [ANNOUNCE] New committer: John Roesler

2019-11-12 Thread Boyang Chen
Great work John! Well deserved

On Tue, Nov 12, 2019 at 1:56 PM Guozhang Wang  wrote:

> Hi Everyone,
>
> The PMC of Apache Kafka is pleased to announce a new Kafka committer, John
> Roesler.
>
> John has been contributing to Apache Kafka since early 2018. His main
> contributions are primarily around Kafka Streams, but have also included
> improving our test coverage beyond Streams as well. Besides his own code
> contributions, John has also actively participated on community discussions
> and reviews including several other contributors' big proposals like
> foreign-key join in Streams (KIP-213). He has also been writing, presenting
> and evangelizing Apache Kafka in many venues.
>
> Congratulations, John! And look forward to more collaborations with you on
> Apache Kafka.
>
>
> Guozhang, on behalf of the Apache Kafka PMC
>


[ANNOUNCE] New committer: John Roesler

2019-11-12 Thread Guozhang Wang
Hi Everyone,

The PMC of Apache Kafka is pleased to announce a new Kafka committer, John
Roesler.

John has been contributing to Apache Kafka since early 2018. His main
contributions are primarily around Kafka Streams, but have also included
improving our test coverage beyond Streams as well. Besides his own code
contributions, John has also actively participated on community discussions
and reviews including several other contributors' big proposals like
foreign-key join in Streams (KIP-213). He has also been writing, presenting
and evangelizing Apache Kafka in many venues.

Congratulations, John! And look forward to more collaborations with you on
Apache Kafka.


Guozhang, on behalf of the Apache Kafka PMC


Leveraging DLQ for errors coming from a sink connector plugin

2019-11-12 Thread Javier Holguera
Hi,

Looking at the Kafka Connect code, it seems that the built-in support for
DLQ queues only works for errors related to transformations and converters
(headers, key, and value).

I wonder if it has been considered (and maybe discarded) to use the same
mechanism for the call to the connector-plugin.put() operation. That way,
it would be possible for connector-plugins to leverage the same DLQ
semantics that Connect already implements without "reinventing the wheel"
themselves.

We have found ourselves in that situation with one in-house connector
plugin that we are building (HTTP Connector with extra specific bits). When
the connector plugin has tried X times to do its HTTP call, we want to
"offload" the record into a DLQ queue.

Any chances that this will be implemented?

Thanks.

Regards,
Javier.


Re: Install kafka-connect-storage-cloud

2019-11-12 Thread Robin Moffatt
Hi Miguel!

If you're using Kafka Connect in standalone mode then you need to pass it a
.properties (key=value) file, not JSON.
JSON is if you are using Kafka Connect in distributed mode (which
personally I advocate, even on a single node), if you use that mode then
you pass the JSON to the REST API after starting the worker.

See https://rmoff.dev/berlin19-kafka-connect for examples and discussion


-- 

Robin Moffatt | Senior Developer Advocate | ro...@confluent.io | @rmoff


On Tue, 12 Nov 2019 at 16:40, Miguel Silvestre  wrote:

> Hi,
>
> I'm new to kafka (really newbie) and I'm trying to set this connector on my
> local machine which is a macOS Mojava 10.14.6.
>
> I've downloaded the connector and put the contents on folder:
> /usr/local/share/kafka/plugins
> and update the plugin.path on file
> /usr/local/etc/kafka/connect-standalone.properties to:
> /usr/local/share/kafka/plugins
>
> I'm launching the connector like this:
> /usr/local/Cellar/kafka/2.3.1/bin/connect-standalone
> /usr/local/etc/kafka/connect-standalone.properties
> /Users/miguel.silvestre/meetups-to-s3.json
>
> However I'm always getting the error bellow.
> Any idea on what am I doing wrong?
>
> Thank you
> Miguel Silvestre
>
> PS. I need a sink connector that reads json from kafka topics and writes to
> s3 on parquet files. I need to read several topics and the files are going
> to the same bucket on different paths. Do you anything that can do the
> task? It seems that secor is having building issues right now.
>
>
>
> [2019-11-12 16:24:19,322] INFO Kafka Connect started
> (org.apache.kafka.connect.runtime.Connect:56)
> [2019-11-12 16:24:19,325] ERROR Failed to create job for
> /Users/miguel.silvestre/meetups-to-s3.json
> (org.apache.kafka.connect.cli.ConnectStandalone:110)
> [2019-11-12 16:24:19,326] ERROR Stopping after connector error
> (org.apache.kafka.connect.cli.ConnectStandalone:121)
> java.util.concurrent.ExecutionException:
> org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector
> config {"s3.part.size"="5242880",, "partition.duration.ms"="360",, "
> s3.bucket.name"="test-connector",,
> "value.converter.schemas.enable"="false",, "timezone"="UTC",, },=,
>
> "partitioner.class"="io.confluent.connect.storage.partitioner.TimeBasedPartitioner",,
> "path.format"="'date'=-MM-dd/'hour'=HH",, "rotate.interval.ms
> "="6",,
> "name"="meetups-to-s3",, "flush.size"="10",,
> "key.converter.schemas.enable"="false",,
> "value.converter"="org.apache.kafka.connect.json.JsonConverter",,
> "topics"="test",, "tasks"=[], "config"={,
> "connector.class"="io.confluent.connect.s3.S3SinkConnector",,
> "format.class"="io.confluent.connect.s3.format.json.JsonFormat",,
> "tasks.max"="1",, "s3.region"="eu-west-1",,
> "key.converter"="org.apache.kafka.connect.json.JsonConverter",,
> "timestamp.extractor"="Record", "locale"="en",,
> "schema.compatibility"="NONE",, {=,
> "storage.class"="io.confluent.connect.s3.storage.S3Storage",, }=} contains
> no connector type
> at
>
> org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
> at
>
> org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
> at
>
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118)
> Caused by:
> org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector
> config {"s3.part.size"="5242880",, "partition.duration.ms"="360",, "
> s3.bucket.name"="test-connector",,
> "value.converter.schemas.enable"="false",, "timezone"="UTC",, },=,
>
> "partitioner.class"="io.confluent.connect.storage.partitioner.TimeBasedPartitioner",,
> "path.format"="'date'=-MM-dd/'hour'=HH",, "rotate.interval.ms
> "="6",,
> "name"="meetups-to-s3",, "flush.size"="10",,
> "key.converter.schemas.enable"="false",,
> "value.converter"="org.apache.kafka.connect.json.JsonConverter",,
> "topics"="test",, "tasks"=[], "config"={,
> "connector.class"="io.confluent.connect.s3.S3SinkConnector",,
> "format.class"="io.confluent.connect.s3.format.json.JsonFormat",,
> "tasks.max"="1",, "s3.region"="eu-west-1",,
> "key.converter"="org.apache.kafka.connect.json.JsonConverter",,
> "timestamp.extractor"="Record", "locale"="en",,
> "schema.compatibility"="NONE",, {=,
> "storage.class"="io.confluent.connect.s3.storage.S3Storage",, }=} contains
> no connector type
> at
>
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:287)
> at
>
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:192)
> at
>
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115)
> [2019-11-12 16:24:19,329] INFO Kafka Connect stopping
> (org.apache.kafka.connect.runtime.Connect:66)
> --
> Miguel Silvestre
>


Install kafka-connect-storage-cloud

2019-11-12 Thread Miguel Silvestre
Hi,

I'm new to kafka (really newbie) and I'm trying to set this connector on my
local machine which is a macOS Mojava 10.14.6.

I've downloaded the connector and put the contents on folder:
/usr/local/share/kafka/plugins
and update the plugin.path on file
/usr/local/etc/kafka/connect-standalone.properties to:
/usr/local/share/kafka/plugins

I'm launching the connector like this:
/usr/local/Cellar/kafka/2.3.1/bin/connect-standalone
/usr/local/etc/kafka/connect-standalone.properties
/Users/miguel.silvestre/meetups-to-s3.json

However I'm always getting the error bellow.
Any idea on what am I doing wrong?

Thank you
Miguel Silvestre

PS. I need a sink connector that reads json from kafka topics and writes to
s3 on parquet files. I need to read several topics and the files are going
to the same bucket on different paths. Do you anything that can do the
task? It seems that secor is having building issues right now.



[2019-11-12 16:24:19,322] INFO Kafka Connect started
(org.apache.kafka.connect.runtime.Connect:56)
[2019-11-12 16:24:19,325] ERROR Failed to create job for
/Users/miguel.silvestre/meetups-to-s3.json
(org.apache.kafka.connect.cli.ConnectStandalone:110)
[2019-11-12 16:24:19,326] ERROR Stopping after connector error
(org.apache.kafka.connect.cli.ConnectStandalone:121)
java.util.concurrent.ExecutionException:
org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector
config {"s3.part.size"="5242880",, "partition.duration.ms"="360",, "
s3.bucket.name"="test-connector",,
"value.converter.schemas.enable"="false",, "timezone"="UTC",, },=,
"partitioner.class"="io.confluent.connect.storage.partitioner.TimeBasedPartitioner",,
"path.format"="'date'=-MM-dd/'hour'=HH",, "rotate.interval.ms"="6",,
"name"="meetups-to-s3",, "flush.size"="10",,
"key.converter.schemas.enable"="false",,
"value.converter"="org.apache.kafka.connect.json.JsonConverter",,
"topics"="test",, "tasks"=[], "config"={,
"connector.class"="io.confluent.connect.s3.S3SinkConnector",,
"format.class"="io.confluent.connect.s3.format.json.JsonFormat",,
"tasks.max"="1",, "s3.region"="eu-west-1",,
"key.converter"="org.apache.kafka.connect.json.JsonConverter",,
"timestamp.extractor"="Record", "locale"="en",,
"schema.compatibility"="NONE",, {=,
"storage.class"="io.confluent.connect.s3.storage.S3Storage",, }=} contains
no connector type
at
org.apache.kafka.connect.util.ConvertingFutureCallback.result(ConvertingFutureCallback.java:79)
at
org.apache.kafka.connect.util.ConvertingFutureCallback.get(ConvertingFutureCallback.java:66)
at
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:118)
Caused by:
org.apache.kafka.connect.runtime.rest.errors.BadRequestException: Connector
config {"s3.part.size"="5242880",, "partition.duration.ms"="360",, "
s3.bucket.name"="test-connector",,
"value.converter.schemas.enable"="false",, "timezone"="UTC",, },=,
"partitioner.class"="io.confluent.connect.storage.partitioner.TimeBasedPartitioner",,
"path.format"="'date'=-MM-dd/'hour'=HH",, "rotate.interval.ms"="6",,
"name"="meetups-to-s3",, "flush.size"="10",,
"key.converter.schemas.enable"="false",,
"value.converter"="org.apache.kafka.connect.json.JsonConverter",,
"topics"="test",, "tasks"=[], "config"={,
"connector.class"="io.confluent.connect.s3.S3SinkConnector",,
"format.class"="io.confluent.connect.s3.format.json.JsonFormat",,
"tasks.max"="1",, "s3.region"="eu-west-1",,
"key.converter"="org.apache.kafka.connect.json.JsonConverter",,
"timestamp.extractor"="Record", "locale"="en",,
"schema.compatibility"="NONE",, {=,
"storage.class"="io.confluent.connect.s3.storage.S3Storage",, }=} contains
no connector type
at
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:287)
at
org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:192)
at
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:115)
[2019-11-12 16:24:19,329] INFO Kafka Connect stopping
(org.apache.kafka.connect.runtime.Connect:66)
--
Miguel Silvestre


Re: MirrorMaker 2 Plugin class loader Error

2019-11-12 Thread Vishal Santoshi
+1

On Mon, Nov 11, 2019 at 2:07 PM Ryanne Dolan  wrote:

> Rajeev, the config errors are unavoidable at present and can be ignored or
> silenced. The Plugin error is concerning, and was previously described by
> Vishal. I suppose it's possible there is a dependency conflict in these
> builds. Can you send me the hash that you're building from? I'll try to
> reproduce.
>
> Ryanne
>
> On Fri, Nov 8, 2019, 7:31 PM Rajeev Chakrabarti
>  wrote:
>
>> Hi Folks,
>> I'm trying to run MM 2 with the current trunk. Also, tried with the 2.4
>> branch and I'm getting "ERROR Plugin class loader for connector:
>> 'org.apache.kafka.connect.mirror.MirrorSourceConnector'" errors for all the
>> connectors. It does not seem to be creating topics in the destination
>> cluster but has created the internal topics at both the source and
>> destination and has populated the heartbeats topic. But none of the source
>> topics created or replicated. I'm also getting a bunch of "not known
>> configs" like 'consumer.group.id' was supplied but isn't a known config.
>> What am I doing wrong?
>> Regards,Rajeev
>>
>


Re: kafka-console-consumer --value-deserializer with access to headers

2019-11-12 Thread M. Manna
HI,

On Tue, 12 Nov 2019 at 14:37, Jorg Heymans  wrote:

> Thanks for helping debugging this. You can reproduce the issue using below
> deserializer, and invoking kafka-console-consumer with
> --value-deserializer=my.BasicDeserializer . As you will see, when the
> consumer starts receiving messages only "SERDE WITHOUT HEADERS" is printed
> to the console.
>
> Thanks,
> Jorg
>
In the above, what command have you put exactly from command prompt ? can
you share this with us?

Thanks,

>
> public class BasicDeserializer implements Deserializer {
>
> @Override
> public void configure(Map configs, boolean isKey) {
> System.out.println("CONFIGURE");
> }
>
> @Override
> public String deserialize(String topic, byte[] data) {
> System.out.println("SERDE WITHOUT HEADERS");
> return new String(data);
> }
>
> @Override
> public String deserialize(String topic, Headers headers, byte[] data) {
> System.out.println("SERDE WITH HEADERS");
> return new String(data);
> }
>
> @Override
> public void close() {
> System.out.println("CLOSE");
> }
> }
>
>
>
>
> On 2019/11/12 12:57:21, "M. Manna"  wrote:
> > HI again,
> >
> > On Tue, 12 Nov 2019 at 12:31, Jorg Heymans 
> wrote:
> >
> > > Hi,
> > >
> > > The issue is not that i cannot get a custom deserializer working, it's
> > > that the custom deserializer i provide implements the default method
> from
> > > the Deserializer interface
> > >
> https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59
> > > that gives access to record Headers.
> > >
> > > The kafka console consumer never calls this method, it will only call
> the
> > > variant without Headers
> > >
> https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50
> > >
> > > I'm using kafka 2.3.0 btw.
> > >
> > > Jorg
> > >
> >
> > Recrord feching (deserialization call) happens using Fetcher. And Fetcher
> > is calling default implementation of Deserializer.deserialize() with
> > header. The default implementation returns the implementation of
> > deserialize() with header. If you provide overridden version of
> > deserializer (for both header/non-header) it will be called.
> >
> >
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265
> >
> >
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268
> >
> > Console consumer simply puts a consumer wrapper around KafkaConsumer.
> There
> > is no change in behaviour otherwise. I take it that you've debugged and
> > confirmed that it's not calling your overridden deserialize() with
> headers?
> > If so, can you link it here for everyone's benefit?
> >
> > Thanks,
> >
> >
> >
> >
> >
> > > On 2019/11/12 11:58:26, "M. Manna"  wrote:
> > > >
> > > > I think you can try the following to get your implementation working
> > > >
> > > > 1) Provide the SerDe classes into classpath
> > > > 2) Provide your consumer config file
> > > > 3) Provide key/value Deserializer props via --consumer-property arg.
> > > >
> > >
> > >
> >
>


Re: kafka-console-consumer --value-deserializer with access to headers

2019-11-12 Thread Jorg Heymans
Thanks for helping debugging this. You can reproduce the issue using below 
deserializer, and invoking kafka-console-consumer with 
--value-deserializer=my.BasicDeserializer . As you will see, when the consumer 
starts receiving messages only "SERDE WITHOUT HEADERS" is printed to the 
console. 

Thanks,
Jorg

public class BasicDeserializer implements Deserializer {

@Override
public void configure(Map configs, boolean isKey) {
System.out.println("CONFIGURE");
}

@Override
public String deserialize(String topic, byte[] data) {
System.out.println("SERDE WITHOUT HEADERS");
return new String(data);
}

@Override
public String deserialize(String topic, Headers headers, byte[] data) {
System.out.println("SERDE WITH HEADERS");
return new String(data);
}

@Override
public void close() {
System.out.println("CLOSE");
}
}




On 2019/11/12 12:57:21, "M. Manna"  wrote: 
> HI again,
> 
> On Tue, 12 Nov 2019 at 12:31, Jorg Heymans  wrote:
> 
> > Hi,
> >
> > The issue is not that i cannot get a custom deserializer working, it's
> > that the custom deserializer i provide implements the default method from
> > the Deserializer interface
> > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59
> > that gives access to record Headers.
> >
> > The kafka console consumer never calls this method, it will only call the
> > variant without Headers
> > https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50
> >
> > I'm using kafka 2.3.0 btw.
> >
> > Jorg
> >
> 
> Recrord feching (deserialization call) happens using Fetcher. And Fetcher
> is calling default implementation of Deserializer.deserialize() with
> header. The default implementation returns the implementation of
> deserialize() with header. If you provide overridden version of
> deserializer (for both header/non-header) it will be called.
> 
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265
> 
> https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268
> 
> Console consumer simply puts a consumer wrapper around KafkaConsumer. There
> is no change in behaviour otherwise. I take it that you've debugged and
> confirmed that it's not calling your overridden deserialize() with headers?
> If so, can you link it here for everyone's benefit?
> 
> Thanks,
> 
> 
> 
> 
> 
> > On 2019/11/12 11:58:26, "M. Manna"  wrote:
> > >
> > > I think you can try the following to get your implementation working
> > >
> > > 1) Provide the SerDe classes into classpath
> > > 2) Provide your consumer config file
> > > 3) Provide key/value Deserializer props via --consumer-property arg.
> > >
> >
> >
> 


Re: kafka-console-consumer --value-deserializer with access to headers

2019-11-12 Thread M. Manna
HI again,

On Tue, 12 Nov 2019 at 12:31, Jorg Heymans  wrote:

> Hi,
>
> The issue is not that i cannot get a custom deserializer working, it's
> that the custom deserializer i provide implements the default method from
> the Deserializer interface
> https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59
> that gives access to record Headers.
>
> The kafka console consumer never calls this method, it will only call the
> variant without Headers
> https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50
>
> I'm using kafka 2.3.0 btw.
>
> Jorg
>

Recrord feching (deserialization call) happens using Fetcher. And Fetcher
is calling default implementation of Deserializer.deserialize() with
header. The default implementation returns the implementation of
deserialize() with header. If you provide overridden version of
deserializer (for both header/non-header) it will be called.

https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1265

https://github.com/apache/kafka/blob/4e5b86419982050217d06b3c30ba6236e1fd9090/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L1268

Console consumer simply puts a consumer wrapper around KafkaConsumer. There
is no change in behaviour otherwise. I take it that you've debugged and
confirmed that it's not calling your overridden deserialize() with headers?
If so, can you link it here for everyone's benefit?

Thanks,





> On 2019/11/12 11:58:26, "M. Manna"  wrote:
> >
> > I think you can try the following to get your implementation working
> >
> > 1) Provide the SerDe classes into classpath
> > 2) Provide your consumer config file
> > 3) Provide key/value Deserializer props via --consumer-property arg.
> >
>
>


Re: kafka-console-consumer --value-deserializer with access to headers

2019-11-12 Thread Jorg Heymans
Hi,

The issue is not that i cannot get a custom deserializer working, it's that the 
custom deserializer i provide implements the default method from the 
Deserializer interface 
https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L59
 that gives access to record Headers.

The kafka console consumer never calls this method, it will only call the 
variant without Headers 
https://github.com/apache/kafka/blob/6f0008643db6e7299658442784f1bcc6c96395ed/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L50
 

I'm using kafka 2.3.0 btw.

Jorg

On 2019/11/12 11:58:26, "M. Manna"  wrote: 
> 
> I think you can try the following to get your implementation working
> 
> 1) Provide the SerDe classes into classpath
> 2) Provide your consumer config file
> 3) Provide key/value Deserializer props via --consumer-property arg.
> 



Re: kafka-console-consumer --value-deserializer with access to headers

2019-11-12 Thread M. Manna
Hi

On Tue, 12 Nov 2019 at 09:53, Jorg Heymans  wrote:

> Indeed, i corrected the typo but now my deserializer class is not taken
> into account at all and it goes back to the default deserializer. You can
> verify this by putting a non-existent class and it still runs fine.
>
> value.deserializer=does.not.exist
>
> In ConsoleConsumer, the bootstrap.server, key/value deserializer are being
enforced via --consumer-property arg. It's aggregating all properties
between --consumer-property and --consumer.config. It'll prioritise kv pair
supplied via --consumer-property over the prop file.

I think you can try the following to get your implementation working

1) Provide the SerDe classes into classpath
2) Provide your consumer config file
3) Provide key/value Deserializer props via --consumer-property arg.

See how that works for you.

Thanks,

> Jorg
>
> On 2019/11/11 14:31:49, "M. Manna"  wrote:
> > You have a typo - you mean deserializer
> >
> > Please try again.
> >
> > Regards,
> >
> > On Mon, 11 Nov 2019 at 14:28, Jorg Heymans 
> wrote:
> >
> > > Don't think that option is available there, specifying
> > > 'value.deserializer' in my consumer-config.properties file gives
> > >
> > > [2019-11-11 15:16:26,589] WARN The configuration 'value.serializer' was
> > > supplied but isn't a known config.
> > > (org.apache.kafka.clients.consumer.ConsumerConfig)
> > >
> > > Does there exist a description of what properties the consumer-config
> > > properties file accepts ? I could find only a few references to it in
> the
> > > documentation.
> > >
> > > Jorg
> > >
> > > On 2019/11/11 13:00:03, "M. Manna"  wrote:
> > > > Hi,
> > > >
> > > >
> > > > On Mon, 11 Nov 2019 at 10:58, Jorg Heymans 
> > > wrote:
> > > >
> > > > > Hi,
> > > > >
> > > > > I have created a class implementing Deserializer, providing an
> > > > > implementation for
> > > > >
> > > > > public String deserialize(String topic, Headers headers, byte[]
> data)
> > > > >
> > > > > that does some conditional processing based on headers, and then
> calls
> > > the
> > > > > other serde method
> > > > >
> > > > > public String deserialize(String topic, byte[] data)
> > > > >
> > > > > What i'm seeing is that kafka-console-consumer only uses the second
> > > method
> > > > > when a value deserializer is specified. Is there a way to force it
> to
> > > > > invoke the first method, so i can do processing with headers ? I
> tried
> > > > > implementing the deprecated 'ExtendedSerializer' but it does not
> make a
> > > > > difference.
> > > > >
> > > > > Thanks,
> > > > > Jorg
> > > > >
> > > >
> > > > Have you tried providing a separate prop file using consumer.config
> > > > argument? Please see the reference here:
> > > >
> > > > --consumer.config   Consumer config properties
> file.
> > > > Note
> > > >that [consumer-property]
> takes
> > > >precedence over this
> config.
> > > >
> > > > Try that and see how it goes.
> > > >
> > > > Thanks,
> > > >
> > >
> >
>


Re: kafka-console-consumer --value-deserializer with access to headers

2019-11-12 Thread Jorg Heymans
Indeed, i corrected the typo but now my deserializer class is not taken into 
account at all and it goes back to the default deserializer. You can verify 
this by putting a non-existent class and it still runs fine.

value.deserializer=does.not.exist

Jorg

On 2019/11/11 14:31:49, "M. Manna"  wrote: 
> You have a typo - you mean deserializer
> 
> Please try again.
> 
> Regards,
> 
> On Mon, 11 Nov 2019 at 14:28, Jorg Heymans  wrote:
> 
> > Don't think that option is available there, specifying
> > 'value.deserializer' in my consumer-config.properties file gives
> >
> > [2019-11-11 15:16:26,589] WARN The configuration 'value.serializer' was
> > supplied but isn't a known config.
> > (org.apache.kafka.clients.consumer.ConsumerConfig)
> >
> > Does there exist a description of what properties the consumer-config
> > properties file accepts ? I could find only a few references to it in the
> > documentation.
> >
> > Jorg
> >
> > On 2019/11/11 13:00:03, "M. Manna"  wrote:
> > > Hi,
> > >
> > >
> > > On Mon, 11 Nov 2019 at 10:58, Jorg Heymans 
> > wrote:
> > >
> > > > Hi,
> > > >
> > > > I have created a class implementing Deserializer, providing an
> > > > implementation for
> > > >
> > > > public String deserialize(String topic, Headers headers, byte[] data)
> > > >
> > > > that does some conditional processing based on headers, and then calls
> > the
> > > > other serde method
> > > >
> > > > public String deserialize(String topic, byte[] data)
> > > >
> > > > What i'm seeing is that kafka-console-consumer only uses the second
> > method
> > > > when a value deserializer is specified. Is there a way to force it to
> > > > invoke the first method, so i can do processing with headers ? I tried
> > > > implementing the deprecated 'ExtendedSerializer' but it does not make a
> > > > difference.
> > > >
> > > > Thanks,
> > > > Jorg
> > > >
> > >
> > > Have you tried providing a separate prop file using consumer.config
> > > argument? Please see the reference here:
> > >
> > > --consumer.config   Consumer config properties file.
> > > Note
> > >that [consumer-property] takes
> > >precedence over this config.
> > >
> > > Try that and see how it goes.
> > >
> > > Thanks,
> > >
> >
>