I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it should already have the common conf lib in there).
On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, <mich.talebza...@gmail.com> wrote: > Thanks Liam for the suggestion. > > This is the redone sink file (plain text) > > name=bigquery-sink > connector.type=bigquery-connector > connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector > defaultDataset=test > project=axial-glow-224522 > topics=md > autoCreateTables=false > gcsBucketName=tmp_storage_bucket > queueSize=-1 > bigQueryRetry=0 > bigQueryRetryWait=1000 > bigQueryMessageTimePartitioning=false > bigQueryPartitionDecorator=true > timePartitioningType=DAY > keySource=FILE > keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json > sanitizeTopics=false > > schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever > threadPoolSize=10 > allBQFieldsNullable=false > avroDataCacheSize=100 > batchLoadIntervalSec=120 > convertDoubleSpecialValues=false > enableBatchLoad=false > upsertEnabled=false > deleteEnabled=false > mergeIntervalMs=60_000L > mergeRecordsThreshold=-1 > autoCreateBucket=true > allowNewBigQueryFields=false > allowBigQueryRequiredFieldRelaxation=false > allowSchemaUnionization=false > kafkaDataFieldName=null > kafkaKeyFieldName=null > > Now when I run the command > > $KAFKA_HOME/bin/connect-standalone.sh \ > /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \ > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties > > It comes back with this error: > > [2021-03-12 09:23:54,523] INFO REST server listening at > http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/ > (org.apache.kafka.connect.runtime.rest.RestServer:207) > [2021-03-12 09:23:54,523] INFO Kafka Connect started > (org.apache.kafka.connect.runtime.Connect:55) > [2021-03-12 09:23:54,534] ERROR Stopping after connector error > (org.apache.kafka.connect.cli.ConnectStandalone:113) > java.lang.NoClassDefFoundError: > org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString > at > > com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505) > at > > com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79) > at > org.apache.kafka.connect.connector.Connector.validate(Connector.java:132) > at > > org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277) > at > > org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164) > at > > org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107) > Caused by: java.lang.ClassNotFoundException: > org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString > at java.net.URLClassLoader.findClass(URLClassLoader.java:382) > at java.lang.ClassLoader.loadClass(ClassLoader.java:424) > at > > org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104) > at java.lang.ClassLoader.loadClass(ClassLoader.java:357) > > I downloaded common-config-6.1.0.jar and added to lib directory in > > ..../wepay-kafka-connect-bigquery-2.1.0/lib > > But little joy I am afraid. > > Cheers, > > Mich > > > > LinkedIn * > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > < > https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw > >* > > > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for any > loss, damage or destruction of data or any other property which may arise > from relying on this email's technical content is explicitly disclaimed. > The author will in no case be liable for any monetary damages arising from > such loss, damage or destruction. > > > > > On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson < > liam.cla...@adscale.co.nz> wrote: > > > Hi Mich, > > > > Your bigquery-sink.properties file is in a JSON format - which won't > work. > > It needs to follow the usual format of a Java properties file. > > > > Kind regards, > > > > Liam Clarke-Hutchinson > > > > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh < > > mich.talebza...@gmail.com> > > wrote: > > > > > Hi, > > > > > > > > > Trying to stream from Kafka to Google BigQuery. > > > > > > > > > The connect-standalone.properties is as follows > > > > > > > > > key.converter=org.apache.kafka.connect.storage.StringConverter > > > > > > ##value.converter=org.apache.kafka.connect.storage.StringConverter > > > > > > value.converter=org.apache.kafka.connect.json.JsonConverter > > > > > > # > > > > > > # Converter-specific settings can be passed in by prefixing the > > Converter's > > > > > > # setting with the converter we want to apply it to > > > > > > key.converter.schemas.enable=true > > > > > > value.converter.schemas.enable=false > > > > > > > > > # The internal converter used for offsets and config data is > configurable > > > and > > > > > > # must be specified, but most users will always want to use the > built-in > > > > > > # default. Offset and config data is never visible outside of Kafka > > Connect > > > in > > > > > > # this format. > > > > > > ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter > > > > > > internal.value.converter=org.apache.kafka.connect.json.JsonConverter > > > > > > internal.key.converter=org.apache.kafka.connect.storage.StringConverter > > > > > > > > > ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter > > > > > > internal.key.converter.schemas.enable=false > > > > > > internal.value.converter.schemas.enable=false > > > > > > > > > offset.storage.file.filename=/tmp/connect_bq.offsets > > > > > > # Flush much faster than normal, which is useful for testing/debugging > > > > > > offset.flush.interval.ms=10000 > > > > > > > > > # Set to a list of filesystem paths separated by commas (,) to enable > > class > > > > > > # loading isolation for plugins (connectors, converters, > > transformations). > > > The > > > > > > # list should consist of top level directories that include any > > combination > > > of: > > > > > > # a) directories immediately containing jars with plugins and their > > > dependencies > > > > > > # b) uber-jars with plugins and their dependencies > > > > > > # c) directories immediately containing the package directory structure > > of > > > > > > # classes of plugins and their dependencies Note: symlinks will be > > followed > > > to > > > > > > # discover dependencies or plugins. > > > > > > # Examples: > > > > > > plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins > > > > > > > > > And bigquery-sink.properties file has this > > > > > > > > > { > > > > > > "name": "bigquery-sink", > > > > > > "connector.type": "bigquery-connector", > > > > > > "connector.class": > > > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector", > > > > > > "defaultDataset": "test", > > > > > > "project": "xyz", > > > > > > "topics": "md", > > > > > > "autoCreateTables": "false", > > > > > > "gcsBucketName": "tmp_storage_bucket", > > > > > > "queueSize": "-1", > > > > > > "bigQueryRetry": "0", > > > > > > "bigQueryRetryWait": "1000", > > > > > > "bigQueryMessageTimePartitioning": "false", > > > > > > "bigQueryPartitionDecorator": "true", > > > > > > "timePartitioningType": "DAY", > > > > > > "keySource": "FILE", > > > > > > "keyfile": "/home/hduser/xyz.json", > > > > > > "sanitizeTopics": "false", > > > > > > "schemaRetriever": > > > > > > > > > "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever", > > > > > > "threadPoolSize": "10", > > > > > > "allBQFieldsNullable": "false", > > > > > > "avroDataCacheSize": "100", > > > > > > "batchLoadIntervalSec": "120", > > > > > > "convertDoubleSpecialValues": "false", > > > > > > "enableBatchLoad": "false", > > > > > > "upsertEnabled": "false", > > > > > > "deleteEnabled": "false", > > > > > > "mergeIntervalMs": "60_000L", > > > > > > "mergeRecordsThreshold": "-1", > > > > > > "autoCreateBucket": "true", > > > > > > "allowNewBigQueryFields": "false", > > > > > > "allowBigQueryRequiredFieldRelaxation": "false", > > > > > > "allowSchemaUnionization": "false", > > > > > > "kafkaDataFieldName": "null", > > > > > > "kafkaKeyFieldName": "null" > > > > > > } > > > > > > Run as below > > > > > > > > > $KAFKA_HOME/bin/connect-standalone.sh \ > > > > > > > > > /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \ > > > > > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties > > > > > > I get this error > > > > > > [2021-03-11 11:07:58,826] ERROR Failed to create job for > > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties > > > (org.apache.kafka.connect.cli.ConnectStandalone:102) > > > [2021-03-11 11:07:58,826] ERROR Stopping after connector error > > > (org.apache.kafka.connect.cli.ConnectStandalone:113) > > > java.util.concurrent.ExecutionException: > > > org.apache.kafka.connect.runtime.rest.errors.BadRequestException: > > Connector > > > config {"defaultDataset"="test",, > > > > > > > > > "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",, > > > "project"="axial-glow-224522",, "autoCreateTables"="false",, > > > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",, > > > "bigQueryMessageTimePartitioning"="false",, > > > "connector.type"="bigquery-connector",, > > > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",, > > > "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",, > > > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",, > > > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",, > > > "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",, > > > "keySource"="FILE",, "allowNewBigQueryFields"="false",, > > > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",, > > > "threadPoolSize"="10",, "timePartitioningType"="DAY",, > > > "enableBatchLoad"="false",, > > > > > > > > > "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",, > > > "mergeRecordsThreshold"="-1",, "queueSize"="-1",, > > > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=, > > > "avroDataCacheSize"="100",, "upsertEnabled"="false",, > > > "kafkaDataFieldName"="null",, }=, > > > "allowBigQueryRequiredFieldRelaxation"="false",} contains no connector > > type > > > > > > I think the problem is the wrong entry in the bigquery-sink.properties > > > file above. > > > > > > I cannot see what it is? > > > > > > > > > Any ideas appreciated. > > > > > > > > > Thanks > > > > > > *Disclaimer:* Use it at your own risk. Any and all responsibility for > any > > > loss, damage or destruction of data or any other property which may > arise > > > from relying on this email's technical content is explicitly > disclaimed. > > > The author will in no case be liable for any monetary damages arising > > from > > > such loss, damage or destruction. > > > > > >