Kafka - DC, DR (Data Recovery) replication with 0 RPO

2021-03-12 Thread sanjay kumar
Hi Team,

Hope you are doing great!!

This is Rakesh and I am managing Apache Kafka administration in an
organization. Currently I am using Kafka version 2.4.
I have few queries related to Kafka data replication among multiple Kafka
clusters.

1. The first query is that, how to achieve 0 RPO or 0 data loss while
replicating same Kafka topics among different Kafka clusters? Is Mirror
Maker is the only solution?
2. If It is Mirror maker version 2 helps with same, is it possible to
replicate/update only new records which we have received in the source
Kafka cluster?

*Explanation:*
The query 2 is the most important use case which needs to get resolved, I
can explain this scenario in depth with an example:
"I have three Kafka clusters:
*1. Source cluster (DC)*
*2. Data Recovery (DR) Cluster*
*3. Destination cluster*.
Here I am replicating data from the source cluster (DC) to the data
recovery(DR) cluster and destination cluster at the same time using Mirror
Maker 2.
Now if at any point of time if the destination cluster gets down it will
lose the new records/data till the downtime and until it gets up.
When it gets up it should heal up lost data from data recovery cluster, but:
 *Instead of replicating all the records from the source topic it should
only replicate the delta records *(only new records which is there in the
data recovery cluster )* to the destination cluster which can help to
achieve Zero (0) RPO among multiple Kafka clusters.*"

I hope the team would understand this scenario and that would be a great
help if I get the solution on this. Also, we could connect remotely or on
phone call to understand the depth of this or if there is any further
queries/concerns on this use case.

contact: *+91-7065990605, +91-8178237439 *
email*: kmrac...@gmail.com *
google meet:* we can also connect on google meet.*


On Sat, Mar 13, 2021, 11:33 AM  wrote:

> Hi! This is the ezmlm program. I'm managing the
> users@kafka.apache.org mailing list.
>
> I'm working for my owner, who can be reached
> at users-ow...@kafka.apache.org.
>
> Acknowledgment: I have added the address
>
>kmrac...@gmail.com
>
> to the users mailing list.
>
> Welcome to users@kafka.apache.org!
>
> Please save this message so that you know the address you are
> subscribed under, in case you later want to unsubscribe or change your
> subscription address.
>
>
> --- Administrative commands for the users list ---
>
> I can handle administrative requests automatically. Please
> do not send them to the list address! Instead, send
> your message to the correct command address:
>
> To subscribe to the list, send a message to:
>
>
> To remove your address from the list, send a message to:
>
>
> Send mail to the following for info and FAQ for this list:
>
>
>
> Similar addresses exist for the digest list:
>
>
>
> To get messages 123 through 145 (a maximum of 100 per request), mail:
>
>
> To get an index with subject and author for messages 123-456 , mail:
>
>
> They are always returned as sets of 100, max 2000 per request,
> so you'll actually get 100-499.
>
> To receive all messages with the same subject as message 12345,
> send a short message to:
>
>
> The messages should contain one line or word of text to avoid being
> treated as sp@m, but I will ignore their content.
> Only the ADDRESS you send to is important.
>
> You can start a subscription for an alternate address,
> for example "john@host.domain", just add a hyphen and your
> address (with '=' instead of '@') after the command word:
> 
>
> To stop subscription for this address, mail:
> 
>
> In both cases, I'll send a confirmation message to that address. When
> you receive it, simply reply to it to complete your subscription.
>
> If despite following these instructions, you do not get the
> desired results, please contact my owner at
> users-ow...@kafka.apache.org. Please be patient, my owner is a
> lot slower than I am ;-)
>
> --- Enclosed is a copy of the request I received.
>
> Return-Path: 
> Received: (qmail 51351 invoked by uid 99); 13 Mar 2021 06:03:08 -
> Received: from spamproc1-he-fi.apache.org (HELO spamproc1-he-fi.apache.org)
> (95.217.134.168)
> by apache.org (qpsmtpd/0.29) with ESMTP; Sat, 13 Mar 2021 06:03:08
> +
> Received: from localhost (localhost [127.0.0.1])
> by spamproc1-he-fi.apache.org (ASF Mail Server at
> spamproc1-he-fi.apache.org) with ESMTP id 5E07AC0340
> for  gmail@kafka.apache.org>; Sat, 13 Mar 2021 06:03:07 + (UTC)
> X-Virus-Scanned: Debian amavisd-new at spamproc1-he-fi.apache.org
> X-Spam-Flag: NO
> X-Spam-Score: 0.249
> X-Spam-Level:
> X-Spam-Status: No, score=0.249 tagged_above=-999 required=6.31
> tests=[DKIM_SIGNED=0.1, DKIM_VALID=-0.1, DKIM_VALID_AU=-0.1,
> DKIM_VALID_EF=-0.1, FREEMAIL_ENVFROM_END_DIGIT=0.25,
> HTML_MESSAGE=0.2,
> SPF_PASS=-0.001] autolearn=disabled
> Authentication-Results: spamproc1-he-fi.apache.org (amavisd-new);
> dkim=pass 

Re: Redis as state store

2021-03-12 Thread Guozhang Wang
Hello Mohan,

I think what you had in mind works with Redis, since it is a remote state
store engine, it does not have the co-partitioning requirements as local
state stores.

One thing you'd need to tune KS though is that with remote stores, the
processing latency may be larger, and since Kafka Streams process all
records of a single partition in order, synchronously, you may need to tune
the poll interval configs etc to make sure KS would stay in the consumer
group and not trigger unnecessary rebalances.

Guozhang

On Thu, Mar 11, 2021 at 6:41 PM Parthasarathy, Mohan 
wrote:

> Hi,
>
> I have a use case where messages come in with some key gets assigned some
> partition and the state gets created. Later, key changes (but still
> contains the old key in the message) and gets sent to a different
> partition. I want to be able to grab the old state using the old key before
> creating the new state on this instance. Redis as a  state store makes it
> easy to implement this where I can simply do a lookup before creating the
> state. I see an implementation here :
> https://github.com/andreas-schroeder/redisks/tree/master/src/main/java/com/github/andreas_schroeder/redisks
>
> Has anyone tried this ? Any caveats.
>
> Thanks
> Mohan
>
>

-- 
-- Guozhang


Re: Error in Kafka property file contains no connector type

2021-03-12 Thread Mich Talebzadeh
Ok this try to stream kafka topic to BigQuery has moved on but I am now
getting this error

[2021-03-12 20:17:41,870] ERROR Stopping due to error
(org.apache.kafka.connect.cli.ConnectStandalone:122)
j*ava.lang.NoSuchMethodError:
org.apache.kafka.common.acl.AclBindingFilter.(Lorg/apache/kafka/common/resource/ResourcePatternFilter;Lorg/apache/kafka/common/acl/AccessControlEntryFilter;)V*
at
org.apache.kafka.connect.mirror.MirrorSourceConnector.(MirrorSourceConnector.java:67)
at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native
Method)
at
sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
at
sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
at java.lang.Class.newInstance(Class.java:442)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.getPluginDesc(DelegatingClassLoader.java:302)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanPluginPath(DelegatingClassLoader.java:280)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.scanUrlsAndAddPlugins(DelegatingClassLoader.java:216)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.registerPlugin(DelegatingClassLoader.java:208)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initPluginLoader(DelegatingClassLoader.java:177)
at
org.apache.kafka.connect.runtime.isolation.DelegatingClassLoader.initLoaders(DelegatingClassLoader.java:154)
at
org.apache.kafka.connect.runtime.isolation.Plugins.(Plugins.java:56)
at
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:77)

I googled it but could not get much.


Any ideas please.


Thanks



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 12 Mar 2021 at 14:44, Mich Talebzadeh 
wrote:

> Thanks again Liam.
>
> This is the error
>
> [2021-03-12 14:17:54,670] ERROR Stopping after connector error
> (org.apache.kafka.connect.cli.ConnectStandalone:113)
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
> at
> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
>at
> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
> at
> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
> at
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
> at
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
> at
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
> Caused by: java.lang.ClassNotFoundException:
> *org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString*
>
> It is indeed under $KAFKA_HOME/libs
>
>
> cd $KAFKA_HOME/libs
>
>  grep -lRi org.apache.kafka.common.config.ConfigDef
>
> kafka-clients-1.1.0.jar
>
> So it is in kafka-clients-1.1.0.jar file
>
>
> I added the $KAFKA_HOME/libs to the CLASSPATH. It did not work. Then I
> copied over the file to
> share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib
>
>
>
> grep -lRi
> org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>
>
> share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib/kafka-clients-1.1.0.jar
>
> Still cannot find it!
>
> In my plugin I have specified
>
> plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
>
>
> So not sure where it is looking
>
>
>
>
> Regards,
>
>
> Mich
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 12 Mar 2021 at 13:44, Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> wrote:
>
>> I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
>> should already have the common conf lib in there).
>>
>>
>> On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, <
>> mich.talebza...@gmail.com>
>> wrote:
>>
>> > Thanks Liam for the suggestion.
>> >
>> > This is the redone sink file (plain text)
>> >
>> > name=bigquery-sink
>> > connector.type=bigquery-connector
>> > connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
>> > defaultDataset=test
>> > 

[VOTE] 2.6.2 RC0

2021-03-12 Thread Sophie Blee-Goldman
Hello Kafka users, developers and client-developers,

This is the first candidate for release of Apache Kafka 2.6.2.

Apache Kafka 2.6.2 is a bugfix release and fixes 30 issues since the 2.6.1
release. Please see the release notes for more information.

Release notes for the 2.6.2 release:
https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/RELEASE_NOTES.html

*** Please download, test and vote by Friday, March 19th, 9am PST

Kafka's KEYS file containing PGP keys we use to sign the release:
https://kafka.apache.org/KEYS

* Release artifacts to be voted upon (source and binary):
https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/

* Maven artifacts to be voted upon:
https://repository.apache.org/content/groups/staging/org/apache/kafka/

* Javadoc:
https://home.apache.org/~ableegoldman/kafka-2.6.2-rc0/javadoc/

* Tag to be voted upon (off 2.6 branch) is the 2.6.2 tag:
https://github.com/apache/kafka/releases/tag/2.6.2-rc0

* Documentation:
https://kafka.apache.org/26/documentation.html

* Protocol:
https://kafka.apache.org/26/protocol.html

* Successful Jenkins builds for the 2.6 branch:
Unit/integration tests:
https://ci-builds.apache.org/job/Kafka/job/kafka-2.6-jdk8/105/
System tests:
https://jenkins.confluent.io/job/system-test-kafka-branch-builder/4435/

/**

Thanks,
Sophie


Error upgrading KafkaStreams

2021-03-12 Thread Murilo Tavares
Hi
I have Kafka brokers running on version 2.4.1, with a KafkaStreams app on
2.4.0.
I'm trying to upgrade my KafkaStreams to v2.7.0, but I got my instances
stuck on startup.
In my understanding, I don't need any special procedure to upgraded from
KStreams 2.4.0 to 2.7.0, right?

The following error stands out for me:

2021-03-12 16:23:52.005 [...] WARN
 org.apache.kafka.streams.processor.internals.StreamThread - stream-thread
[...] Detected the states of tasks
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are
corrupted. Will close the task as dirty and re-create and bootstrap from
scratch.
org.apache.kafka.streams.errors.TaskCorruptedException: Tasks with
changelogs
{7_17=[my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17]} are
corrupted and hence needs to be re-initialized
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:446)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.initializeAndRestorePhase(StreamThread.java:744)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:625)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:553)
[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:512)
[app.jar:?]
Caused by: org.apache.kafka.clients.consumer.OffsetOutOfRangeException:
Fetch position FetchPosition{offset=738, offsetEpoch=Optional.empty,
currentLeader=LeaderAndEpoch{leader=Optional[...:9092 (id: 3 rack:
euw1-az1)], epoch=27}} is out of range for partition
my-assembler-4-KTABLE-AGGREGATE-aggregated-topic-changelog-17
at
org.apache.kafka.clients.consumer.internals.Fetcher.handleOffsetOutOfRange(Fetcher.java:1366)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.Fetcher.initializeCompletedFetch(Fetcher.java:1318)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:614)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1272)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1233)
~[app.jar:?]
at
org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1206)
~[app.jar:?]
at
org.apache.kafka.streams.processor.internals.StoreChangelogReader.restore(StoreChangelogReader.java:433)
~[app.jar:?]
... 4 more

Any suggestions on how to upgrade?
Thanks
Murilo


Re: Error in Kafka property file contains no connector type

2021-03-12 Thread Mich Talebzadeh
Thanks again Liam.

This is the error

[2021-03-12 14:17:54,670] ERROR Stopping after connector error
(org.apache.kafka.connect.cli.ConnectStandalone:113)
java.lang.NoClassDefFoundError:
org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
at
com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
   at
com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
at
org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
at
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
at
org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
at
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
Caused by: java.lang.ClassNotFoundException:
*org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString*

It is indeed under $KAFKA_HOME/libs


cd $KAFKA_HOME/libs

 grep -lRi org.apache.kafka.common.config.ConfigDef

kafka-clients-1.1.0.jar

So it is in kafka-clients-1.1.0.jar file


I added the $KAFKA_HOME/libs to the CLASSPATH. It did not work. Then I
copied over the file to
share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib



grep -lRi
org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString

share/kafka/plugins/wepay-kafka-connect-bigquery-2.1.0/lib/kafka-clients-1.1.0.jar

Still cannot find it!

In my plugin I have specified

plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins


So not sure where it is looking




Regards,


Mich



*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 12 Mar 2021 at 13:44, Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> wrote:

> I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
> should already have the common conf lib in there).
>
>
> On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh,  >
> wrote:
>
> > Thanks Liam for the suggestion.
> >
> > This is the redone sink file (plain text)
> >
> > name=bigquery-sink
> > connector.type=bigquery-connector
> > connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
> > defaultDataset=test
> > project=axial-glow-224522
> > topics=md
> > autoCreateTables=false
> > gcsBucketName=tmp_storage_bucket
> > queueSize=-1
> > bigQueryRetry=0
> > bigQueryRetryWait=1000
> > bigQueryMessageTimePartitioning=false
> > bigQueryPartitionDecorator=true
> > timePartitioningType=DAY
> > keySource=FILE
> > keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
> > sanitizeTopics=false
> >
> >
> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
> > threadPoolSize=10
> > allBQFieldsNullable=false
> > avroDataCacheSize=100
> > batchLoadIntervalSec=120
> > convertDoubleSpecialValues=false
> > enableBatchLoad=false
> > upsertEnabled=false
> > deleteEnabled=false
> > mergeIntervalMs=60_000L
> > mergeRecordsThreshold=-1
> > autoCreateBucket=true
> > allowNewBigQueryFields=false
> > allowBigQueryRequiredFieldRelaxation=false
> > allowSchemaUnionization=false
> > kafkaDataFieldName=null
> > kafkaKeyFieldName=null
> >
> > Now when I run the command
> >
> > $KAFKA_HOME/bin/connect-standalone.sh \
> >
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> >
> > It comes back with this error:
> >
> > [2021-03-12 09:23:54,523] INFO REST server listening at
> > http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/
> > (org.apache.kafka.connect.runtime.rest.RestServer:207)
> > [2021-03-12 09:23:54,523] INFO Kafka Connect started
> > (org.apache.kafka.connect.runtime.Connect:55)
> > [2021-03-12 09:23:54,534] ERROR Stopping after connector error
> > (org.apache.kafka.connect.cli.ConnectStandalone:113)
> > java.lang.NoClassDefFoundError:
> > org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
> > at
> >
> >
> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
> > at
> >
> >
> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
> > at
> > org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
> > at
> >
> >
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
> > at
> >
> >
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
> > at
> >
> >
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
> > 

Re: Error in Kafka property file contains no connector type

2021-03-12 Thread Liam Clarke-Hutchinson
I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
should already have the common conf lib in there).


On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, 
wrote:

> Thanks Liam for the suggestion.
>
> This is the redone sink file (plain text)
>
> name=bigquery-sink
> connector.type=bigquery-connector
> connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
> defaultDataset=test
> project=axial-glow-224522
> topics=md
> autoCreateTables=false
> gcsBucketName=tmp_storage_bucket
> queueSize=-1
> bigQueryRetry=0
> bigQueryRetryWait=1000
> bigQueryMessageTimePartitioning=false
> bigQueryPartitionDecorator=true
> timePartitioningType=DAY
> keySource=FILE
> keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
> sanitizeTopics=false
>
> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
> threadPoolSize=10
> allBQFieldsNullable=false
> avroDataCacheSize=100
> batchLoadIntervalSec=120
> convertDoubleSpecialValues=false
> enableBatchLoad=false
> upsertEnabled=false
> deleteEnabled=false
> mergeIntervalMs=60_000L
> mergeRecordsThreshold=-1
> autoCreateBucket=true
> allowNewBigQueryFields=false
> allowBigQueryRequiredFieldRelaxation=false
> allowSchemaUnionization=false
> kafkaDataFieldName=null
> kafkaKeyFieldName=null
>
> Now when I run the command
>
> $KAFKA_HOME/bin/connect-standalone.sh \
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>
> It comes back with this error:
>
> [2021-03-12 09:23:54,523] INFO REST server listening at
> http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/
> (org.apache.kafka.connect.runtime.rest.RestServer:207)
> [2021-03-12 09:23:54,523] INFO Kafka Connect started
> (org.apache.kafka.connect.runtime.Connect:55)
> [2021-03-12 09:23:54,534] ERROR Stopping after connector error
> (org.apache.kafka.connect.cli.ConnectStandalone:113)
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
> at
>
> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
> at
>
> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
> at
> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
> at
>
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
> at
>
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
> at
>
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
> at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
> at
>
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
> at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> I downloaded common-config-6.1.0.jar and added to lib directory in
>
> /wepay-kafka-connect-bigquery-2.1.0/lib
>
> But little joy I am afraid.
>
> Cheers,
>
> Mich
>
>
>
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <
> https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >*
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> wrote:
>
> > Hi Mich,
> >
> > Your bigquery-sink.properties file is in a JSON format - which won't
> work.
> > It needs to follow the usual format of a Java properties file.
> >
> > Kind regards,
> >
> > Liam Clarke-Hutchinson
> >
> > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
> > mich.talebza...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > >
> > > Trying to stream from Kafka to Google BigQuery.
> > >
> > >
> > >  The connect-standalone.properties is as follows
> > >
> > >
> > > key.converter=org.apache.kafka.connect.storage.StringConverter
> > >
> > > ##value.converter=org.apache.kafka.connect.storage.StringConverter
> > >
> > > value.converter=org.apache.kafka.connect.json.JsonConverter
> > >
> > > #
> > >
> > > # Converter-specific settings can be passed in by prefixing the
> > Converter's
> > >
> > > # setting with the converter we want to apply it to
> > >
> > > key.converter.schemas.enable=true
> > >
> > > value.converter.schemas.enable=false
> > >
> > >
> > > # The internal converter used for 

Re: Kafka Streams - Out of Order Handling

2021-03-12 Thread Marcus Horsley-Rai
Thanks Matthias - that's great to know.

> Increasing the grace period should not really affect throughput, but
> latency.

Yes, a slip of the tongue on my part, you’re right :-)

One last question if I may? I only see issues of out of order data in my 
re-partitioned topic as a result of a rebalance happening.
My hypothesis is that when an instance of my streams app dies - the consumption 
of data from the partitions it was responsible for falls behind compared to 
others.
I believe all stream threads across all app instances will pause consuming 
whilst the rebalance is worked through.. but am I right in thinking that one 
streams app (or at least some of its stream threads) will have to wait for 
state to be synced from the changelog topic?
In other words - when a rebalance happens - I assume the consumer group doesn’t 
wait for the slowest member to be ready to consume?

To illustrate with an example:
If I have 3 partitions of a single topic and three streams app 
instances (1 partition each)
I have a producer that produces to each partition each minute on the 
minute
Normally the timestamp of the head record is roughly the same across 
all three partitions. This assumes no lag ever builds up on the consumer group, 
and also assumes data volume and size of messages is comparable.

Now I kill streams app A. The rebalance protocol kicks in and gives 
instance B an extra partition to consume from.
Could there now be a bigger lag for one or both of the partitions app B 
is consuming from because it had to sync state store state? (Assume B has 
enough stream processing threads idle and the machine is specced to cope with 
the extra load)
   …whereas app C, unhindered by state syncing, has potentially now 
produced to the through topic a record from a newer batch/time window.

If this is the case, do you think increasing standby replicas will lessen the 
issue?  I obviously don’t expect it to be a magic bullet, and grace period is 
still required in general


Best Regards,

Marcus




On Thu, Mar 11, 2021 at 1:40 AM Matthias J. Sax mailto:mj...@apache.org>> wrote:
> will it consider a timestamp in the body of the message, if we have 
> implemented a custom TimeExtractor?

Yes.


> Or, which I feel is more likely - does TimeExtractor stream time only apply 
> later on once deserialisation has happened?

Well, the extractor does apply after deserialization, but we deserialize
each partition head-record to be able to apply the timestamp extractor:
ie, deserialization happens when a record becomes the "head record".

Cf
https://github.com/apache/kafka/blob/trunk/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java
 



> the accuracy of the aggregates may have to come second to the throughput.

Increasing the grace period should not really affect throughput, but
latency.



-Matthias


On 3/10/21 3:37 PM, Marcus Horsley-Rai wrote:
> 
> Thanks for your reply Matthias, and really great talks :-)
> 
> You’re right that I only have one input topic - though it does have 20 
> partitions.
> The pointer to max.task.idle.ms  cleared something 
> up for me; I read the following line from Kafka docs but couldn’t find what 
> configuration they were referring to.
> 
>>  Within a stream task that may be processing multiple topic-partitions, 
>> if users configure the application to not wait for all partitions to contain 
>> some buffered data and pick from the partition with the smallest timestamp 
>> to process the next record, then later on when some records are fetched for 
>> other topic-partitions, their timestamps may be smaller than those processed 
>> records fetched from another topic-partition.
>>
>   
> When streams is checking the head record of each partition to pick the lowest 
> timestamp - will it consider a timestamp in the body of the message, if we 
> have implemented a custom TimeExtractor?
> Or, which I feel is more likely - does TimeExtractor stream time only apply 
> later on once deserialisation has happened?
> The reason I ask is because our producer code doesn’t manually set the 
> timestamp in ProducerRecord, only in the JSON body. That may be something we 
> can look to change.
> 
> As you say, I fear adjusting grace time may be my only solution; however 
> because this is a real-time monitoring application…the accuracy of the 
> aggregates may have to come second to the throughput.
> 
> Many thanks,
> 
> Marcus
> 
> 
> On 2021/03/09 08:21:22, "Matthias J. Sax"  > wrote: 
>> In general, Kafka Streams tries to process messages in timestamp order,> 
>> ie, oldest message first. However, Kafka Streams always need to process> 
>> messages in offset order per partition, and thus, the timestamp> 
>> synchronization applied to records from 

Re: Error in Kafka property file contains no connector type

2021-03-12 Thread Mich Talebzadeh
Thanks Liam for the suggestion.

This is the redone sink file (plain text)

name=bigquery-sink
connector.type=bigquery-connector
connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
defaultDataset=test
project=axial-glow-224522
topics=md
autoCreateTables=false
gcsBucketName=tmp_storage_bucket
queueSize=-1
bigQueryRetry=0
bigQueryRetryWait=1000
bigQueryMessageTimePartitioning=false
bigQueryPartitionDecorator=true
timePartitioningType=DAY
keySource=FILE
keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
sanitizeTopics=false
schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
threadPoolSize=10
allBQFieldsNullable=false
avroDataCacheSize=100
batchLoadIntervalSec=120
convertDoubleSpecialValues=false
enableBatchLoad=false
upsertEnabled=false
deleteEnabled=false
mergeIntervalMs=60_000L
mergeRecordsThreshold=-1
autoCreateBucket=true
allowNewBigQueryFields=false
allowBigQueryRequiredFieldRelaxation=false
allowSchemaUnionization=false
kafkaDataFieldName=null
kafkaKeyFieldName=null

Now when I run the command

$KAFKA_HOME/bin/connect-standalone.sh \
/d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
/d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties

It comes back with this error:

[2021-03-12 09:23:54,523] INFO REST server listening at
http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/
(org.apache.kafka.connect.runtime.rest.RestServer:207)
[2021-03-12 09:23:54,523] INFO Kafka Connect started
(org.apache.kafka.connect.runtime.Connect:55)
[2021-03-12 09:23:54,534] ERROR Stopping after connector error
(org.apache.kafka.connect.cli.ConnectStandalone:113)
java.lang.NoClassDefFoundError:
org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
at
com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
at
com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
at
org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
at
org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
at
org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
at
org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
Caused by: java.lang.ClassNotFoundException:
org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
at
org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
at java.lang.ClassLoader.loadClass(ClassLoader.java:357)

I downloaded common-config-6.1.0.jar and added to lib directory in

/wepay-kafka-connect-bigquery-2.1.0/lib

But little joy I am afraid.

Cheers,

Mich



LinkedIn * 
https://www.linkedin.com/profile/view?id=AAEWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
*





*Disclaimer:* Use it at your own risk. Any and all responsibility for any
loss, damage or destruction of data or any other property which may arise
from relying on this email's technical content is explicitly disclaimed.
The author will in no case be liable for any monetary damages arising from
such loss, damage or destruction.




On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
liam.cla...@adscale.co.nz> wrote:

> Hi Mich,
>
> Your bigquery-sink.properties file is in a JSON format - which won't work.
> It needs to follow the usual format of a Java properties file.
>
> Kind regards,
>
> Liam Clarke-Hutchinson
>
> On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
> mich.talebza...@gmail.com>
> wrote:
>
> > Hi,
> >
> >
> > Trying to stream from Kafka to Google BigQuery.
> >
> >
> >  The connect-standalone.properties is as follows
> >
> >
> > key.converter=org.apache.kafka.connect.storage.StringConverter
> >
> > ##value.converter=org.apache.kafka.connect.storage.StringConverter
> >
> > value.converter=org.apache.kafka.connect.json.JsonConverter
> >
> > #
> >
> > # Converter-specific settings can be passed in by prefixing the
> Converter's
> >
> > # setting with the converter we want to apply it to
> >
> > key.converter.schemas.enable=true
> >
> > value.converter.schemas.enable=false
> >
> >
> > # The internal converter used for offsets and config data is configurable
> > and
> >
> > # must be specified, but most users will always want to use the built-in
> >
> > # default. Offset and config data is never visible outside of Kafka
> Connect
> > in
> >
> > # this format.
> >
> > ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> >
> > internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> >
> > internal.key.converter=org.apache.kafka.connect.storage.StringConverter