AishD3 opened a new issue, #1602:
URL: https://github.com/apache/camel-kafka-connector/issues/1602
Facing below error while using Azure Gen2 Data Lake Sink connector.
"org.apache.kafka.connect.errors.ConnectException: Exiting WorkerSinkTask
due to unrecoverable exception.\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:614)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)\n\tat
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:257)\n\tat
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)\n\tat
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\tat
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\tat
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\tat
java.base/java.util
.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\tat
java.base/java.lang.Thread.run(Thread.java:833)\nCaused by:
org.apache.kafka.connect.errors.ConnectException: Exchange delivery has
failed!\n\tat
org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:210)\n\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)\n\t...
11 more\nCaused by:
com.azure.storage.file.datalake.models.DataLakeStorageException: If you are
using a StorageSharedKeyCredential, and the server returned an error message
that says 'Signature did not match', you can compare the string to sign with
the one generated by the SDK. To log the string to sign, pass in the context
key value pair 'Azure-Storage-Log-String-To-Sign': true to the appropriate
method call.\nIf you are using a SAS token, and the server returned an error
message that says 'Signature did not match', you can compare the string to sign
with the one generated by the SDK. To l
og the string to sign, pass in the context key value pair
'Azure-Storage-Log-String-To-Sign': true to the appropriate generateSas method
call.\nPlease remember to disable 'Azure-Storage-Log-String-To-Sign' before
going to production as this string can potentially contain PII.\nStatus code
403,
\"{\"error\":{\"code\":\"AuthorizationPermissionMismatch\",\"message\":\"This
request is not authorized to perform this operation using this
permission.\\nRequestId:63c48fdb-d01f-000d-4c22-6e3603000000\\nTime:2024-03-04T10:57:07.2520699Z\"}}\"\n\tat
java.base/java.lang.invoke.MethodHandle.invokeWithArguments(MethodHandle.java:732)\n\tat
com.azure.core.implementation.http.rest.ResponseExceptionConstructorCache.invoke(ResponseExceptionConstructorCache.java:56)\n\tat
com.azure.core.implementation.http.rest.RestProxyBase.instantiateUnexpectedException(RestProxyBase.java:356)\n\tat
com.azure.core.implementation.http.rest.AsyncRestProxy.lambda$ensureExpectedStatus$1(AsyncRestProxy.java:127)\n\tat
re
actor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:113)\n\tat
reactor.core.publisher.Operators$ScalarSubscription.request(Operators.java:2400)\n\tat
reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.request(FluxMapFuseable.java:171)\n\tat
reactor.core.publisher.Operators$MultiSubscriptionSubscriber.set(Operators.java:2196)\n\tat
reactor.core.publisher.Operators$MultiSubscriptionSubscriber.onSubscribe(Operators.java:2070)\n\tat
reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onSubscribe(FluxMapFuseable.java:96)\n\tat
reactor.core.publisher.MonoJust.subscribe(MonoJust.java:55)\n\tat
reactor.core.publisher.InternalMonoOperator.subscribe(InternalMonoOperator.java:64)\n\tat
reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:157)\n\tat
reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)\n\tat
reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.ja
va:137)\n\tat
reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)\n\tat
reactor.core.publisher.FluxHide$SuppressFuseableSubscriber.onNext(FluxHide.java:137)\n\tat
reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)\n\tat
reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)\n\tat
reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)\n\tat
reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)\n\tat
reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)\n\tat
reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.complete(MonoIgnoreThen.java:292)\n\tat
reactor.core.publisher.MonoIgnoreThen$ThenIgnoreMain.onNext(MonoIgnoreThen.java:187)\n\tat
reactor.core.publisher.FluxMapFuseable$MapFuseableSubscriber.onNext(FluxMapFuseable.java:129)\n\tat
reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)\
n\tat
reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)\n\tat
reactor.core.publisher.SerializedSubscriber.onNext(SerializedSubscriber.java:99)\n\tat
reactor.core.publisher.FluxRetryWhen$RetryWhenMainSubscriber.onNext(FluxRetryWhen.java:174)\n\tat
reactor.core.publisher.FluxOnErrorResume$ResumeSubscriber.onNext(FluxOnErrorResume.java:79)\n\tat
reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1839)\n\tat
reactor.core.publisher.MonoFlatMap$FlatMapMain.onNext(MonoFlatMap.java:151)\n\tat
reactor.core.publisher.Operators$MonoInnerProducerBase.complete(Operators.java:2666)\n\tat
reactor.core.publisher.MonoSingle$SingleSubscriber.onComplete(MonoSingle.java:180)\n\tat
reactor.core.publisher.MonoFlatMapMany$FlatMapManyInner.onComplete(MonoFlatMapMany.java:260)\n\tat
reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)\n\tat
reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap
.java:275)\n\tat
reactor.core.publisher.FluxSwitchIfEmpty$SwitchIfEmptySubscriber.onComplete(FluxSwitchIfEmpty.java:85)\n\tat
reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)\n\tat
reactor.core.publisher.FluxHandle$HandleSubscriber.onComplete(FluxHandle.java:220)\n\tat
reactor.core.publisher.FluxMap$MapConditionalSubscriber.onComplete(FluxMap.java:275)\n\tat
reactor.core.publisher.FluxDoFinally$DoFinallySubscriber.onComplete(FluxDoFinally.java:128)\n\tat
reactor.core.publisher.FluxHandleFuseable$HandleFuseableSubscriber.onComplete(FluxHandleFuseable.java:236)\n\tat
reactor.core.publisher.FluxContextWrite$ContextWriteSubscriber.onComplete(FluxContextWrite.java:126)\n\tat
reactor.core.publisher.Operators$MonoSubscriber.complete(Operators.java:1840)\n\tat
reactor.core.publisher.MonoCollectList$MonoCollectListSubscriber.onComplete(MonoCollectList.java:129)\n\tat
reactor.core.publisher.FluxPeek$PeekSubscriber.onComplete(FluxPeek.java:260)\n\tat
reactor.core.publisher.FluxMap$MapSubscriber.onComplete(FluxMap.java:144)\n\tat
reactor.netty.channel.FluxReceive.onInboundComplete(FluxReceive.java:413)\n\tat
reactor.netty.channel.ChannelOperations.onInboundComplete(ChannelOperations.java:424)\n\tat
reactor.netty.channel.ChannelOperations.terminate(ChannelOperations.java:478)\n\tat
reactor.netty.http.client.HttpClientOperations.onInboundNext(HttpClientOperations.java:712)\n\tat
reactor.netty.channel.ChannelOperationsHandler.channelRead(ChannelOperationsHandler.java:113)\n\tat
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat
com.azure.core.http.netty.implementation.AzureSdkHandler.channelRead(AzureSdkHandler.java:222)\n\tat
io.netty.channel.AbstractChanne
lHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat
io.netty.channel.CombinedChannelDuplexHandler$DelegatingChannelHandlerContext.fireChannelRead(CombinedChannelDuplexHandler.java:436)\n\tat
io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346)\n\tat
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:318)\n\tat
io.netty.channel.CombinedChannelDuplexHandler.channelRead(CombinedChannelDuplexHandler.java:251)\n\tat
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442)\n\tat
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat
io.netty.channel.AbstractChannelHandlerCont
ext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat
io.netty.handler.ssl.SslHandler.unwrap(SslHandler.java:1385)\n\tat
io.netty.handler.ssl.SslHandler.decodeNonJdkCompatible(SslHandler.java:1259)\n\tat
io.netty.handler.ssl.SslHandler.decode(SslHandler.java:1299)\n\tat
io.netty.handler.codec.ByteToMessageDecoder.decodeRemovalReentryProtection(ByteToMessageDecoder.java:529)\n\tat
io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:468)\n\tat
io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290)\n\tat
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444)\n\tat
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat
io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412)\n\tat
io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline
.java:1410)\n\tat
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440)\n\tat
io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420)\n\tat
io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919)\n\tat
io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:800)\n\tat
io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509)\n\tat
io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407)\n\tat
io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997)\n\tat
io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74)\n\tat
io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30)\n\t...
1 more\n\tSuppressed: java.lang.Exception: #block terminated with an
error\n\t\tat reactor.core.publisher.Block
ingSingleSubscriber.blockingGet(BlockingSingleSubscriber.java:99)\n\t\tat
reactor.core.publisher.Mono.block(Mono.java:1742)\n\t\tat
com.azure.storage.common.implementation.StorageImplUtils.blockWithOptionalTimeout(StorageImplUtils.java:146)\n\t\tat
com.azure.storage.file.datalake.DataLakeFileClient.uploadWithResponse(DataLakeFileClient.java:432)\n\t\tat
org.apache.camel.component.azure.storage.datalake.client.DataLakeFileClientWrapper.uploadWithResponse(DataLakeFileClientWrapper.java:113)\n\t\tat
org.apache.camel.component.azure.storage.datalake.operations.DataLakeFileOperations.upload(DataLakeFileOperations.java:221)\n\t\tat
org.apache.camel.component.azure.storage.datalake.DataLakeProducer.process(DataLakeProducer.java:93)\n\t\tat
org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:65)\n\t\tat
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)\n\t\tat
org.apache.camel.processor.erro
rhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:475)\n\t\tat
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.executeReactiveWork(DefaultReactiveExecutor.java:196)\n\t\tat
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:164)\n\t\tat
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)\n\t\tat
org.apache.camel.processor.Pipeline.process(Pipeline.java:163)\n\t\tat
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:383)\n\t\tat
org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:96)\n\t\tat
org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:212)\n\t\tat
org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:109)\n\t\tat
org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAs
yncProcessorAwaitManager.java:81)\n\t\tat
org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:106)\n\t\tat
org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:180)\n\t\tat
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176)\n\t\tat
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148)\n\t\tat
org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:205)\n\t\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:583)\n\t\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:336)\n\t\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:237)\n\t\tat
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:206)\n\t\tat
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:202)\n\t\tat
org.apache.kafka.conne
ct.runtime.WorkerTask.run(WorkerTask.java:257)\n\t\tat
org.apache.kafka.connect.runtime.isolation.Plugins.lambda$withClassLoader$1(Plugins.java:177)\n\t\tat
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:539)\n\t\tat
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)\n\t\tat
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)\n\t\tat
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)\n\t\t...
1 more\n"
},
Below is connector configuration
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: adls-sink
labels:
strimzi.io/cluster: cdt-connect-shared-emp
spec:
class:
org.apache.camel.kafkaconnector.azurestoragedatalakesink.CamelAzurestoragedatalakesinkSinkConnector
tasksMax: 2
config:
topics: msk.emp.cdtadlsconnectorsink.topic.internal.any.v1
camel.kamelet.azure-storage-datalake-sink.accountName: retinaconnectorpoc
camel.kamelet.azure-storage-datalake-sink.clientId:
328xxxxxxxxxxxxxxxxxxxx68792db9
camel.kamelet.azure-storage-datalake-sink.clientSecret:
uqKxxxxxxxxxxxxxxxx6N5eS-lcbw
camel.kamelet.azure-storage-datalake-sink.tenantId:
05d7xxxxxxxxxxxxx-eb416c396f2d
camel.kamelet.azure-storage-datalake-sink.fileSystemName: test
azure.datalake.gen2.sas.key
value.converter.schema.registry.ssl.key.password:
${env:SSL_TRUSTSTORE_PASSWORD}
value.converter.schema.registry.ssl.keystore.location:
/opt/kafka/external-configuration/sr-certs/keystore.jks
value.converter.schema.registry.ssl.keystore.password:
${env:SSL_TRUSTSTORE_PASSWORD}
value.converter.schema.registry.ssl.truststore.location:
/opt/kafka/external-configuration/sr-certs/truststore.jks
value.converter.schema.registry.ssl.truststore.password:
${env:SSL_TRUSTSTORE_PASSWORD}
value.converter.schema.registry.url: https://cxxxxxxxxxxxxxdigital.net
value.converter.schemas.enable: false
key.converter.schema.registry.ssl.key.password:
${env:SSL_TRUSTSTORE_PASSWORD}
key.converter.schema.registry.ssl.keystore.location:
/opt/kafka/external-configuration/sr-certs/keystore.jks
key.converter.schema.registry.ssl.keystore.password:
${env:SSL_TRUSTSTORE_PASSWORD}
key.converter.schema.registry.ssl.truststore.location:
/opt/kafka/external-configuration/sr-certs/truststore.jks
key.converter.schema.registry.ssl.truststore.password:
${env:SSL_TRUSTSTORE_PASSWORD}
key.converter.schema.registry.url: https://xxxxxxxxxxxxxxxxxxxigital.net
key.converter.schemas.enable: false
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.storage.StringConverter
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]