I feel your CLASSPATH env var might need to include $KAFKA_HOME/lib (it
should already have the common conf lib in there).


On Fri, 12 Mar. 2021, 10:33 pm Mich Talebzadeh, <mich.talebza...@gmail.com>
wrote:

> Thanks Liam for the suggestion.
>
> This is the redone sink file (plain text)
>
> name=bigquery-sink
> connector.type=bigquery-connector
> connector.class=com.wepay.kafka.connect.bigquery.BigQuerySinkConnector
> defaultDataset=test
> project=axial-glow-224522
> topics=md
> autoCreateTables=false
> gcsBucketName=tmp_storage_bucket
> queueSize=-1
> bigQueryRetry=0
> bigQueryRetryWait=1000
> bigQueryMessageTimePartitioning=false
> bigQueryPartitionDecorator=true
> timePartitioningType=DAY
> keySource=FILE
> keyfile=/home/hduser/GCPFirstProject-d75f1b3a9817.json
> sanitizeTopics=false
>
> schemaRetriever=com.wepay.kafka.connect.bigquery.retrieve.IdentitySchemaRetriever
> threadPoolSize=10
> allBQFieldsNullable=false
> avroDataCacheSize=100
> batchLoadIntervalSec=120
> convertDoubleSpecialValues=false
> enableBatchLoad=false
> upsertEnabled=false
> deleteEnabled=false
> mergeIntervalMs=60_000L
> mergeRecordsThreshold=-1
> autoCreateBucket=true
> allowNewBigQueryFields=false
> allowBigQueryRequiredFieldRelaxation=false
> allowSchemaUnionization=false
> kafkaDataFieldName=null
> kafkaKeyFieldName=null
>
> Now when I run the command
>
> $KAFKA_HOME/bin/connect-standalone.sh \
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
>
> It comes back with this error:
>
> [2021-03-12 09:23:54,523] INFO REST server listening at
> http://50.140.197.220:8083/, advertising URL http://50.140.197.220:8083/
> (org.apache.kafka.connect.runtime.rest.RestServer:207)
> [2021-03-12 09:23:54,523] INFO Kafka Connect started
> (org.apache.kafka.connect.runtime.Connect:55)
> [2021-03-12 09:23:54,534] ERROR Stopping after connector error
> (org.apache.kafka.connect.cli.ConnectStandalone:113)
> java.lang.NoClassDefFoundError:
> org/apache/kafka/common/config/ConfigDef$CaseInsensitiveValidString
>         at
>
> com.wepay.kafka.connect.bigquery.config.BigQuerySinkConfig.getConfig(BigQuerySinkConfig.java:505)
>         at
>
> com.wepay.kafka.connect.bigquery.BigQuerySinkConnector.config(BigQuerySinkConnector.java:79)
>         at
> org.apache.kafka.connect.connector.Connector.validate(Connector.java:132)
>         at
>
> org.apache.kafka.connect.runtime.AbstractHerder.validateConnectorConfig(AbstractHerder.java:277)
>         at
>
> org.apache.kafka.connect.runtime.standalone.StandaloneHerder.putConnectorConfig(StandaloneHerder.java:164)
>         at
>
> org.apache.kafka.connect.cli.ConnectStandalone.main(ConnectStandalone.java:107)
> Caused by: java.lang.ClassNotFoundException:
> org.apache.kafka.common.config.ConfigDef$CaseInsensitiveValidString
>         at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
>         at
>
> org.apache.kafka.connect.runtime.isolation.PluginClassLoader.loadClass(PluginClassLoader.java:104)
>         at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
>
> I downloaded common-config-6.1.0.jar and added to lib directory in
>
> ..../wepay-kafka-connect-bigquery-2.1.0/lib
>
> But little joy I am afraid.
>
> Cheers,
>
> Mich
>
>
>
> LinkedIn *
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> <
> https://www.linkedin.com/profile/view?id=AAEAAAAWh2gBxianrbJd6zP6AcPCCdOABUrV8Pw
> >*
>
>
>
>
>
> *Disclaimer:* Use it at your own risk. Any and all responsibility for any
> loss, damage or destruction of data or any other property which may arise
> from relying on this email's technical content is explicitly disclaimed.
> The author will in no case be liable for any monetary damages arising from
> such loss, damage or destruction.
>
>
>
>
> On Fri, 12 Mar 2021 at 02:56, Liam Clarke-Hutchinson <
> liam.cla...@adscale.co.nz> wrote:
>
> > Hi Mich,
> >
> > Your bigquery-sink.properties file is in a JSON format - which won't
> work.
> > It needs to follow the usual format of a Java properties file.
> >
> > Kind regards,
> >
> > Liam Clarke-Hutchinson
> >
> > On Fri, Mar 12, 2021 at 12:13 AM Mich Talebzadeh <
> > mich.talebza...@gmail.com>
> > wrote:
> >
> > > Hi,
> > >
> > >
> > > Trying to stream from Kafka to Google BigQuery.
> > >
> > >
> > >  The connect-standalone.properties is as follows
> > >
> > >
> > > key.converter=org.apache.kafka.connect.storage.StringConverter
> > >
> > > ##value.converter=org.apache.kafka.connect.storage.StringConverter
> > >
> > > value.converter=org.apache.kafka.connect.json.JsonConverter
> > >
> > > #
> > >
> > > # Converter-specific settings can be passed in by prefixing the
> > Converter's
> > >
> > > # setting with the converter we want to apply it to
> > >
> > > key.converter.schemas.enable=true
> > >
> > > value.converter.schemas.enable=false
> > >
> > >
> > > # The internal converter used for offsets and config data is
> configurable
> > > and
> > >
> > > # must be specified, but most users will always want to use the
> built-in
> > >
> > > # default. Offset and config data is never visible outside of Kafka
> > Connect
> > > in
> > >
> > > # this format.
> > >
> > > ##internal.key.converter=org.apache.kafka.connect.json.JsonConverter
> > >
> > > internal.value.converter=org.apache.kafka.connect.json.JsonConverter
> > >
> > > internal.key.converter=org.apache.kafka.connect.storage.StringConverter
> > >
> > >
> >
> ##internal.value.converter=org.apache.kafka.connect.storage.StringConverter
> > >
> > > internal.key.converter.schemas.enable=false
> > >
> > > internal.value.converter.schemas.enable=false
> > >
> > >
> > > offset.storage.file.filename=/tmp/connect_bq.offsets
> > >
> > > # Flush much faster than normal, which is useful for testing/debugging
> > >
> > > offset.flush.interval.ms=10000
> > >
> > >
> > > # Set to a list of filesystem paths separated by commas (,) to enable
> > class
> > >
> > > # loading isolation for plugins (connectors, converters,
> > transformations).
> > > The
> > >
> > > # list should consist of top level directories that include any
> > combination
> > > of:
> > >
> > > # a) directories immediately containing jars with plugins and their
> > > dependencies
> > >
> > > # b) uber-jars with plugins and their dependencies
> > >
> > > # c) directories immediately containing the package directory structure
> > of
> > >
> > > # classes of plugins and their dependencies Note: symlinks will be
> > followed
> > > to
> > >
> > > # discover dependencies or plugins.
> > >
> > > # Examples:
> > >
> > > plugin.path=/d4T/hduser/bigquery-kafka-connect-sink/share/kafka/plugins
> > >
> > >
> > > And bigquery-sink.properties file has this
> > >
> > >
> > > {
> > >
> > >      "name": "bigquery-sink",
> > >
> > >      "connector.type": "bigquery-connector",
> > >
> > >      "connector.class":
> > > "com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",
> > >
> > >      "defaultDataset": "test",
> > >
> > >      "project": "xyz",
> > >
> > >      "topics": "md",
> > >
> > >      "autoCreateTables": "false",
> > >
> > >      "gcsBucketName": "tmp_storage_bucket",
> > >
> > >      "queueSize": "-1",
> > >
> > >      "bigQueryRetry": "0",
> > >
> > >      "bigQueryRetryWait": "1000",
> > >
> > >      "bigQueryMessageTimePartitioning": "false",
> > >
> > >      "bigQueryPartitionDecorator": "true",
> > >
> > >      "timePartitioningType": "DAY",
> > >
> > >      "keySource": "FILE",
> > >
> > >      "keyfile": "/home/hduser/xyz.json",
> > >
> > >      "sanitizeTopics": "false",
> > >
> > >      "schemaRetriever":
> > >
> > >
> >
> "com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",
> > >
> > >      "threadPoolSize": "10",
> > >
> > >      "allBQFieldsNullable": "false",
> > >
> > >      "avroDataCacheSize": "100",
> > >
> > >      "batchLoadIntervalSec": "120",
> > >
> > >      "convertDoubleSpecialValues": "false",
> > >
> > >      "enableBatchLoad": "false",
> > >
> > >      "upsertEnabled": "false",
> > >
> > >      "deleteEnabled": "false",
> > >
> > >      "mergeIntervalMs": "60_000L",
> > >
> > >      "mergeRecordsThreshold": "-1",
> > >
> > >      "autoCreateBucket": "true",
> > >
> > >      "allowNewBigQueryFields": "false",
> > >
> > >      "allowBigQueryRequiredFieldRelaxation": "false",
> > >
> > >      "allowSchemaUnionization": "false",
> > >
> > >      "kafkaDataFieldName": "null",
> > >
> > >      "kafkaKeyFieldName": "null"
> > >
> > > }
> > >
> > > Run as below
> > >
> > >
> > > $KAFKA_HOME/bin/connect-standalone.sh \
> > >
> > >
> >
> /d4T/hduser/bigquery-kafka-connect-sink/etc/connect-standalone.properties \
> > >
> > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> > >
> > > I get this error
> > >
> > > [2021-03-11 11:07:58,826] ERROR Failed to create job for
> > > /d4T/hduser/bigquery-kafka-connect-sink/etc/bigquery-sink.properties
> > > (org.apache.kafka.connect.cli.ConnectStandalone:102)
> > > [2021-03-11 11:07:58,826] ERROR Stopping after connector error
> > > (org.apache.kafka.connect.cli.ConnectStandalone:113)
> > > java.util.concurrent.ExecutionException:
> > > org.apache.kafka.connect.runtime.rest.errors.BadRequestException:
> > Connector
> > > config {"defaultDataset"="test",,
> > >
> > >
> >
> "schemaRetriever"="com.wepay.kafka.connect.bigquery.schemaregistry.schemaretriever.SchemaRegistrySchemaRetriever",,
> > > "project"="axial-glow-224522",, "autoCreateTables"="false",,
> > > "deleteEnabled"="false",, "bigQueryPartitionDecorator"="true",,
> > > "bigQueryMessageTimePartitioning"="false",,
> > > "connector.type"="bigquery-connector",,
> > > "gcsBucketName"="tmp_storage_bucket",, "name"="bigquery-sink",,
> > > "mergeIntervalMs"="60_000L",, "convertDoubleSpecialValues"="false",,
> > > "kafkaKeyFieldName"="null", "sanitizeTopics"="false",,
> > > "keyfile"="/home/hduser/GCPFirstProject-d75f1b3a9817.json",,
> > > "topics"="md",, "bigQueryRetry"="0",, "allBQFieldsNullable"="false",,
> > > "keySource"="FILE",, "allowNewBigQueryFields"="false",,
> > > "bigQueryRetryWait"="1000",, "allowSchemaUnionization"="false",,
> > > "threadPoolSize"="10",, "timePartitioningType"="DAY",,
> > > "enableBatchLoad"="false",,
> > >
> > >
> >
> "connector.class"="com.wepay.kafka.connect.bigquery.BigQuerySinkConnector",,
> > > "mergeRecordsThreshold"="-1",, "queueSize"="-1",,
> > > "batchLoadIntervalSec"="120",, "autoCreateBucket"="true",, {=,
> > > "avroDataCacheSize"="100",, "upsertEnabled"="false",,
> > > "kafkaDataFieldName"="null",, }=,
> > > "allowBigQueryRequiredFieldRelaxation"="false",} contains no connector
> > type
> > >
> > > I think the problem is the wrong entry in the bigquery-sink.properties
> > > file above.
> > >
> > > I cannot see what it is?
> > >
> > >
> > > Any ideas appreciated.
> > >
> > >
> > > Thanks
> > >
> > > *Disclaimer:* Use it at your own risk. Any and all responsibility for
> any
> > > loss, damage or destruction of data or any other property which may
> arise
> > > from relying on this email's technical content is explicitly
> disclaimed.
> > > The author will in no case be liable for any monetary damages arising
> > from
> > > such loss, damage or destruction.
> > >
> >
>

Reply via email to