10elements opened a new issue, #1720:
URL: https://github.com/apache/camel-kafka-connector/issues/1720
I'm trying to use the CamelAwsddbsinkSinkConnector v3.20.3 in MSK
Connect(v2.7.1) to load AVRO messages from a kafka topic into a Dynamodb table,
the AVRO schema is stored in a confluent schema registry. I noticed no matter
what I tried, the connector seemed to failed to find the correct fields in the
message after the deserialization and hence failed to put the message into the
Dynamodb table.
This is my connector config:
```
{
"connector.class":
"org.apache.camel.kafkaconnector.awsddbsink.CamelAwsddbsinkSinkConnector",
"transforms.tojson.type":
"org.apache.camel.kafkaconnector.transforms.SchemaAndStructToJsonTransform",
"transforms.tojson.converter.type": "value",
"topics": "test_orders",
"tasks.max": "1",
"camel.kamelet.aws-ddb-sink.useDefaultCredentialsProvider": "true",
"transforms": "tojson",
"camel.sink.contentLogLevel": "DEBUG",
"value.converter.schema.registry.url":
"https://svcs--schemaregistry.euc1.prvw.ktdev.io",
"camel.kamelet.aws-ddb-sink.operation": "PutItem",
"camel.kamelet.aws-ddb-sink.table": "etl--prvw--euc1--test-orders",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"camel.kamelet.aws-ddb-sink.region": "eu-central-1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter"
}
```
I read from
https://camel.apache.org/camel-kamelets/4.10.x/aws-ddb-sink.html#_expected_data_format_for_sink
that the expected input data should be a JSON, so I tried to add the
`SchemaAndStructToJsonTransform` SMT that I found out from this
[issue](https://github.com/apache/camel-kafka-connector/issues/843) after the
Avro Converter to convert the kafka connect struct after the deserialization
into a JSON before pass it to the sink connector, but still, the connector
failed with an error like this:
```
[Worker-0c1825974767d8637]
software.amazon.awssdk.services.dynamodb.model.DynamoDbException: One or more
parameter values were invalid: Missing the key order_id in the item (Service:
DynamoDb, Status Code: 400, Request ID:
V2V0KK8U36SVSQ8EE9GSK75ARJVV4KQNSO5AEMVJF66Q9ASUAAJG)
--
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
| [Worker-0c1825974767d8637] at
software.amazon.awssdk.services.dynamodb.DefaultDynamoDbClient.putItem(DefaultDynamoDbClient.java:4243)
| [Worker-0c1825974767d8637] at
org.apache.camel.component.aws2.ddb.PutItemCommand.execute(PutItemCommand.java:32)
| [Worker-0c1825974767d8637] at
org.apache.camel.component.aws2.ddb.Ddb2Producer.process(Ddb2Producer.java:55)
| [Worker-0c1825974767d8637] at
org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
| [Worker-0c1825974767d8637] at
org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
| [Worker-0c1825974767d8637] at
org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477)
| [Worker-0c1825974767d8637] at
org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
| [Worker-0c1825974767d8637] at
org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
| [Worker-0c1825974767d8637] at
org.apache.camel.processor.Pipeline.process(Pipeline.java:165)
| [Worker-0c1825974767d8637] at
org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392)
| [Worker-0c1825974767d8637] at
org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:96)
| [Worker-0c1825974767d8637] at
org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:214)
| [Worker-0c1825974767d8637] at
org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:111)
| [Worker-0c1825974767d8637] at
org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
| [Worker-0c1825974767d8637] at
org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:108)
| [Worker-0c1825974767d8637] at
org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:199)
| [Worker-0c1825974767d8637] at
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176)
| [Worker-0c1825974767d8637] at
org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148)
| [Worker-0c1825974767d8637] at
org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:205)
| [Worker-0c1825974767d8637] at
org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
| [Worker-0c1825974767d8637] at
org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
| [Worker-0c1825974767d8637] at
org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
| [Worker-0c1825974767d8637] at
org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
| [Worker-0c1825974767d8637] at
org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)
| [Worker-0c1825974767d8637] at
org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240)
| [Worker-0c1825974767d8637] at
java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
| [Worker-0c1825974767d8637] at
java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
| [Worker-0c1825974767d8637] at
java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
| [Worker-0c1825974767d8637] at
java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
| [Worker-0c1825974767d8637] at
java.base/java.lang.Thread.run(Thread.java:829)
```
So to me it looks like the converted JSON is malformed and don't have the
correct field that's supposed to exist, I then consumed the topic and
deserialized and print out the message in a separate consumer, and I can
confirm that the AVRO messages are correct and have all the required fields
such as `order_id`:
```
2025-06-10 15:00:46,632 - __main__ - INFO - Connected to Schema Registry at
http://localhost:8081
2025-06-10 15:00:47,154 - httpx - INFO - HTTP Request: GET
http://localhost:8081/subjects/test_orders-value/versions/latest "HTTP/1.1 200
OK"
2025-06-10 15:00:47,156 - __main__ - INFO - Retrieved schema for subject
'test_orders-value', version 1 from registry.
2025-06-10 15:00:47,240 - __main__ - INFO - Subscribed to topic: test_orders
2025-06-10 15:00:47,240 - __main__ - INFO - Starting to consume messages.
Press Ctrl+C to stop.
2025-06-10 15:00:47,285 - botocore.credentials - INFO - Found credentials in
shared credentials file: ~/.aws/credentials
2025-06-10 15:00:55,138 - httpx - INFO - HTTP Request: GET
http://localhost:8081/schemas/ids/399 "HTTP/1.1 200 OK"
2025-06-10 15:00:55,140 - __main__ - INFO - Message 1: Partition 0, Offset 0
{'order_id': 256795, 'ordered_at': '2025-05-22T01:11:12.953333',
'product_id': 3351, 'quantity': 9, 'customer_id': 99025, 'customer_name':
'Metro Shipping Co'}
```
Can someone confirm if the CamelAwsddbsinkSinkConnector is even able to work
with AVRO data? If it does, am I missing anything in my connector config in
order to get it work correctly?
P.S I can't use the latest version of CamelAwsddbsinkSinkConnector because
I'm using MSK connect which is still on 2.7.1 and uses java 11.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]