This is an automated email from the ASF dual-hosted git repository.

jamesnetherton pushed a commit to branch camel-main
in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git

commit 5601d0df7be3ff9f954935abeba0291902528419
Author: JiriOndrusek <ondrusek.j...@gmail.com>
AuthorDate: Fri Jun 28 10:02:28 2024 +0200

    Quick fix of aws2-kinesis
---
 .../kinesis/deployment/Aws2KinesisProcessor.java   |  18 +++
 extensions/aws2-kinesis/runtime/pom.xml            |   5 +
 .../kinesis/graalvm/Aws2KinesisSubstitutions.java  | 165 +++++++++++++++++++++
 .../src/main/resources/application.properties      |   4 +
 .../aws2/kinesis/it/Aws2KinesisFirehoseIT.java     |  24 ---
 .../aws2/kinesis/it/Aws2KinesisFirehoseTest.java   | 127 ----------------
 6 files changed, 192 insertions(+), 151 deletions(-)

diff --git 
a/extensions/aws2-kinesis/deployment/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/deployment/Aws2KinesisProcessor.java
 
b/extensions/aws2-kinesis/deployment/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/deployment/Aws2KinesisProcessor.java
index 7a55071918..1cadb5c61a 100644
--- 
a/extensions/aws2-kinesis/deployment/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/deployment/Aws2KinesisProcessor.java
+++ 
b/extensions/aws2-kinesis/deployment/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/deployment/Aws2KinesisProcessor.java
@@ -16,8 +16,13 @@
  */
 package org.apache.camel.quarkus.component.aws2.kinesis.deployment;
 
+import java.util.stream.Stream;
+
+import io.quarkus.deployment.annotations.BuildProducer;
 import io.quarkus.deployment.annotations.BuildStep;
 import io.quarkus.deployment.builditem.FeatureBuildItem;
+import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem;
+import 
io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem;
 import org.jboss.logging.Logger;
 
 class Aws2KinesisProcessor {
@@ -29,4 +34,17 @@ class Aws2KinesisProcessor {
     FeatureBuildItem feature() {
         return new FeatureBuildItem(FEATURE);
     }
+
+    @BuildStep
+    void 
runtimeInitializedClasses(BuildProducer<RuntimeInitializedClassBuildItem> 
runtimeInitializedClass) {
+        
Stream.of("software.amazon.awssdk.services.dynamodb.DynamoDbRetryPolicy",
+                "software.amazon.kinesis.lifecycle.ShutdownTask")
+                .map(RuntimeInitializedClassBuildItem::new)
+                .forEach(runtimeInitializedClass::produce);
+    }
+
+    @BuildStep
+    void build(BuildProducer<ReflectiveClassBuildItem> reflectiveClass) {
+        
reflectiveClass.produce(ReflectiveClassBuildItem.builder("javax.xml.bind.DatatypeConverter").build());
+    }
 }
diff --git a/extensions/aws2-kinesis/runtime/pom.xml 
b/extensions/aws2-kinesis/runtime/pom.xml
index dd3411b192..fc96d75e32 100644
--- a/extensions/aws2-kinesis/runtime/pom.xml
+++ b/extensions/aws2-kinesis/runtime/pom.xml
@@ -54,6 +54,11 @@
             <groupId>io.quarkus</groupId>
             <artifactId>quarkus-netty</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.graalvm.sdk</groupId>
+            <artifactId>nativeimage</artifactId>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 
     <build>
diff --git 
a/extensions/aws2-kinesis/runtime/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/graalvm/Aws2KinesisSubstitutions.java
 
b/extensions/aws2-kinesis/runtime/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/graalvm/Aws2KinesisSubstitutions.java
new file mode 100644
index 0000000000..4bfacfe4bb
--- /dev/null
+++ 
b/extensions/aws2-kinesis/runtime/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/graalvm/Aws2KinesisSubstitutions.java
@@ -0,0 +1,165 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.quarkus.component.aws2.kinesis.graalvm;
+
+import java.io.UnsupportedEncodingException;
+import java.math.BigInteger;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import com.google.protobuf.InvalidProtocolBufferException;
+import com.oracle.svm.core.annotate.Alias;
+import com.oracle.svm.core.annotate.Substitute;
+import com.oracle.svm.core.annotate.TargetClass;
+import software.amazon.kinesis.retrieval.AggregatorUtil;
+import software.amazon.kinesis.retrieval.KinesisClientRecord;
+import software.amazon.kinesis.retrieval.kpl.Messages;
+
+public class Aws2KinesisSubstitutions {
+}
+
+/**
+ * Quick (ugly) fix of 
https://github.com/awslabs/amazon-kinesis-client/issues/1355
+ */
+@TargetClass(AggregatorUtil.class)
+final class AggregatorUtilSubstitutions {
+
+    @Alias
+    protected byte[] calculateTailCheck(byte[] data) {
+        return null;
+    }
+
+    @Alias
+    protected BigInteger effectiveHashKey(String partitionKey, String 
explicitHashKey) throws UnsupportedEncodingException {
+        return null;
+    }
+
+    @Alias
+    public KinesisClientRecord convertRecordToKinesisClientRecord(final 
KinesisClientRecord record,
+            final boolean aggregated,
+            final long subSequenceNumber,
+            final String explicitHashKey) {
+        return null;
+    }
+
+    @Substitute
+    public List<KinesisClientRecord> deaggregate(List<KinesisClientRecord> 
records,
+            BigInteger startingHashKey,
+            BigInteger endingHashKey) {
+        List<KinesisClientRecord> result = new ArrayList<>();
+        byte[] magic = new byte[AggregatorUtil.AGGREGATED_RECORD_MAGIC.length];
+        byte[] digest = new byte[16];
+
+        for (KinesisClientRecord r : records) {
+            boolean isAggregated = true;
+            long subSeqNum = 0;
+            ByteBuffer bb = r.data();
+
+            if (bb.remaining() >= magic.length) {
+                bb.get(magic);
+            } else {
+                isAggregated = false;
+            }
+
+            if (!Arrays.equals(AggregatorUtil.AGGREGATED_RECORD_MAGIC, magic) 
|| bb.remaining() <= 16) {
+                isAggregated = false;
+            }
+
+            if (isAggregated) {
+                int oldLimit = bb.limit();
+                bb.limit(oldLimit - 16);
+                byte[] messageData = new byte[bb.remaining()];
+                bb.get(messageData);
+                bb.limit(oldLimit);
+                bb.get(digest);
+                byte[] calculatedDigest = calculateTailCheck(messageData);
+
+                if (!Arrays.equals(digest, calculatedDigest)) {
+                    isAggregated = false;
+                } else {
+                    try {
+                        Messages.AggregatedRecord ar = 
Messages.AggregatedRecord.parseFrom(messageData);
+                        List<String> pks = ar.getPartitionKeyTableList();
+                        List<String> ehks = ar.getExplicitHashKeyTableList();
+                        long aat = r.approximateArrivalTimestamp() == null
+                                ? -1 : 
r.approximateArrivalTimestamp().toEpochMilli();
+                        try {
+                            int recordsInCurrRecord = 0;
+                            for (Messages.Record mr : ar.getRecordsList()) {
+                                String explicitHashKey = null;
+                                String partitionKey = pks.get((int) 
mr.getPartitionKeyIndex());
+                                if (mr.hasExplicitHashKeyIndex()) {
+                                    explicitHashKey = ehks.get((int) 
mr.getExplicitHashKeyIndex());
+                                }
+
+                                BigInteger effectiveHashKey = 
effectiveHashKey(partitionKey, explicitHashKey);
+
+                                if 
(effectiveHashKey.compareTo(startingHashKey) < 0
+                                        || 
effectiveHashKey.compareTo(endingHashKey) > 0) {
+                                    for (int toRemove = 0; toRemove < 
recordsInCurrRecord; ++toRemove) {
+                                        result.remove(result.size() - 1);
+                                    }
+                                    break;
+                                }
+
+                                ++recordsInCurrRecord;
+
+                                KinesisClientRecord record = r.toBuilder()
+                                        
.data(ByteBuffer.wrap(mr.getData().toByteArray()))
+                                        .partitionKey(partitionKey)
+                                        .explicitHashKey(explicitHashKey)
+                                        .build();
+                                
result.add(convertRecordToKinesisClientRecord(record, true, subSeqNum++, 
explicitHashKey));
+                            }
+                        } catch (Exception e) {
+                            StringBuilder sb = new StringBuilder();
+                            sb.append("Unexpected exception during 
deaggregation, record was:\n");
+                            sb.append("PKS:\n");
+                            for (String s : pks) {
+                                sb.append(s).append("\n");
+                            }
+                            sb.append("EHKS: \n");
+                            for (String s : ehks) {
+                                sb.append(s).append("\n");
+                            }
+                            for (Messages.Record mr : ar.getRecordsList()) {
+                                sb.append("Record: 
[hasEhk=").append(mr.hasExplicitHashKeyIndex()).append(", ")
+                                        
.append("ehkIdx=").append(mr.getExplicitHashKeyIndex()).append(", ")
+                                        
.append("pkIdx=").append(mr.getPartitionKeyIndex()).append(", ")
+                                        
.append("dataLen=").append(mr.getData().toByteArray().length).append("]\n");
+                            }
+                            sb.append("Sequence number: 
").append(r.sequenceNumber()).append("\n")
+                                    .append("Raw data: ")
+                                    
.append(jakarta.xml.bind.DatatypeConverter.printBase64Binary(messageData)).append("\n");
+                            // todo log.error(sb.toString(), e);
+                        }
+                    } catch (InvalidProtocolBufferException e) {
+                        isAggregated = false;
+                    }
+                }
+            }
+
+            if (!isAggregated) {
+                bb.rewind();
+                result.add(r);
+            }
+        }
+        return result;
+    }
+}
diff --git 
a/integration-test-groups/aws2/aws2-kinesis/src/main/resources/application.properties
 
b/integration-test-groups/aws2/aws2-kinesis/src/main/resources/application.properties
index 056ab69848..e3161d4f9b 100644
--- 
a/integration-test-groups/aws2/aws2-kinesis/src/main/resources/application.properties
+++ 
b/integration-test-groups/aws2/aws2-kinesis/src/main/resources/application.properties
@@ -24,3 +24,7 @@ 
camel.component.aws2-kinesis-firehose.access-key=${AWS_ACCESS_KEY}
 camel.component.aws2-kinesis-firehose.secret-key=${AWS_SECRET_KEY}
 
camel.component.aws2-kinesis-firehose.useDefaultCredentialsProvider=${AWS_USE_DEFAULT_CREDENTIALS_PROVIDER}
 camel.component.aws2-kinesis-firehose.region=${AWS_REGION:us-east-1}
+
+
+#quarkus.native.additional-build-args=--trace-object-instantiation=java.util.Random,
 
--trace-object-instantiation=software.amazon.awssdk.core.retry.backoff.FullJitterBackoffStrategy,
 --initialize-at-run-time=software.amazon.kinesis.lifecycle.ShutdownTask, 
--initialize-at-run-time=software.amazon.awssdk.services.dynamodb.DynamoDbRetryPolicy
+quarkus.native.additional-build-args=--initialize-at-build-time=javax.xml.bind.DatatypeConverter
diff --git 
a/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseIT.java
 
b/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseIT.java
deleted file mode 100644
index 1347c84e3d..0000000000
--- 
a/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseIT.java
+++ /dev/null
@@ -1,24 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.quarkus.component.aws2.kinesis.it;
-
-import io.quarkus.test.junit.QuarkusIntegrationTest;
-
-@QuarkusIntegrationTest
-class Aws2KinesisFirehoseIT extends Aws2KinesisFirehoseTest {
-
-}
diff --git 
a/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseTest.java
 
b/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseTest.java
deleted file mode 100644
index b16c505952..0000000000
--- 
a/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseTest.java
+++ /dev/null
@@ -1,127 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.camel.quarkus.component.aws2.kinesis.it;
-
-import java.nio.charset.StandardCharsets;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import io.quarkus.test.common.QuarkusTestResource;
-import io.quarkus.test.junit.QuarkusTest;
-import io.restassured.RestAssured;
-import io.restassured.http.ContentType;
-import org.apache.camel.quarkus.test.support.aws2.Aws2Client;
-import org.apache.camel.quarkus.test.support.aws2.Aws2TestResource;
-import org.apache.camel.quarkus.test.support.aws2.BaseAWs2TestSupport;
-import org.apache.commons.lang3.RandomStringUtils;
-import org.awaitility.Awaitility;
-import org.eclipse.microprofile.config.Config;
-import org.eclipse.microprofile.config.ConfigProvider;
-import org.jboss.logging.Logger;
-import org.junit.jupiter.api.Test;
-import org.testcontainers.containers.localstack.LocalStackContainer.Service;
-import software.amazon.awssdk.core.ResponseInputStream;
-import software.amazon.awssdk.services.s3.S3Client;
-import software.amazon.awssdk.services.s3.model.GetObjectRequest;
-import software.amazon.awssdk.services.s3.model.GetObjectResponse;
-import software.amazon.awssdk.services.s3.model.ListObjectsRequest;
-import software.amazon.awssdk.services.s3.model.ListObjectsResponse;
-import software.amazon.awssdk.services.s3.model.S3Object;
-
-@QuarkusTest
-@QuarkusTestResource(Aws2TestResource.class)
-class Aws2KinesisFirehoseTest extends BaseAWs2TestSupport {
-
-    private static final Logger LOG = 
Logger.getLogger(Aws2KinesisFirehoseTest.class);
-
-    @Aws2Client(Service.S3)
-    S3Client client;
-
-    public Aws2KinesisFirehoseTest() {
-        super("/aws2-kinesis-firehose");
-    }
-
-    @Test
-    public void firehose() {
-        final String msg = RandomStringUtils.randomAlphanumeric(32 * 1024);
-        final String msgPrefix = msg.substring(0, 32);
-        final long maxDataBytes = 
Aws2KinesisTestEnvCustomizer.BUFFERING_SIZE_MB * 1024 * 1024;
-        long bytesSent = 0;
-        LOG.info("Sending " + Aws2KinesisTestEnvCustomizer.BUFFERING_SIZE_MB + 
" MB of data to firehose using chunk "
-                + msgPrefix + "...");
-        final long deadline = System.currentTimeMillis() + 
(Aws2KinesisTestEnvCustomizer.BUFFERING_TIME_SEC * 1000);
-        while (bytesSent < maxDataBytes && System.currentTimeMillis() < 
deadline) {
-            /*
-             * Send at least 1MB of data but do not spend more than a minute 
by doing it.
-             * This is to overpass minimum buffering limits we have set via 
BufferingHints in the EnvCustomizer
-             */
-            RestAssured.given() //
-                    .contentType(ContentType.TEXT)
-                    .body(msg)
-                    .post("/aws2-kinesis-firehose/send") //
-                    .then()
-                    .statusCode(201);
-            bytesSent += msg.length();
-            LOG.info("Sent " + bytesSent + "/" + maxDataBytes + " bytes of 
data");
-        }
-        LOG.info("Sent " + Aws2KinesisTestEnvCustomizer.BUFFERING_SIZE_MB + " 
MB of data to firehose");
-
-        final Config config = ConfigProvider.getConfig();
-
-        final String bucketName = 
config.getValue("aws-kinesis.s3-bucket-name", String.class);
-        LOG.infof("Bucket '%s' should contain objects.", bucketName);
-        Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, 
TimeUnit.SECONDS).until(
-                () -> {
-                    LOG.infof("Reading objects from bucket '%s'", bucketName);
-                    final ListObjectsResponse objects = client
-                            
.listObjects(ListObjectsRequest.builder().bucket(bucketName).build());
-                    final List<S3Object> objs = objects.contents();
-                    LOG.info("There are  " + objs.size() + " objects in bucket 
" + bucketName);
-                    for (S3Object obj : objs) {
-                        LOG.info("Checking object " + obj.key() + " of size " 
+ obj.size());
-                        try (ResponseInputStream<GetObjectResponse> o = client
-                                
.getObject(GetObjectRequest.builder().bucket(bucketName).key(obj.key()).build()))
 {
-                            final StringBuilder sb = new 
StringBuilder(msg.length());
-                            final byte[] buf = new byte[1024];
-                            int len;
-                            while ((len = o.read(buf)) >= 0 && sb.length() < 
msgPrefix.length()) {
-                                sb.append(new String(buf, 0, len, 
StandardCharsets.UTF_8));
-                            }
-                            final String foundContent = sb.toString();
-                            if (foundContent.startsWith(msgPrefix)) {
-                                /* Yes, this is what we have sent */
-                                LOG.info("Found the expected content in object 
" + obj.key());
-                                return true;
-                            }
-                        }
-                    }
-                    return false;
-                });
-
-    }
-
-    @Override
-    public void testMethodForDefaultCredentialsProvider() {
-        RestAssured.given() //
-                .contentType(ContentType.TEXT)
-                .body("test")
-                .post("/aws2-kinesis-firehose/send") //
-                .then()
-                .statusCode(201);
-        ;
-    }
-}

Reply via email to