This is an automated email from the ASF dual-hosted git repository. jamesnetherton pushed a commit to branch camel-main in repository https://gitbox.apache.org/repos/asf/camel-quarkus.git
commit 5601d0df7be3ff9f954935abeba0291902528419 Author: JiriOndrusek <ondrusek.j...@gmail.com> AuthorDate: Fri Jun 28 10:02:28 2024 +0200 Quick fix of aws2-kinesis --- .../kinesis/deployment/Aws2KinesisProcessor.java | 18 +++ extensions/aws2-kinesis/runtime/pom.xml | 5 + .../kinesis/graalvm/Aws2KinesisSubstitutions.java | 165 +++++++++++++++++++++ .../src/main/resources/application.properties | 4 + .../aws2/kinesis/it/Aws2KinesisFirehoseIT.java | 24 --- .../aws2/kinesis/it/Aws2KinesisFirehoseTest.java | 127 ---------------- 6 files changed, 192 insertions(+), 151 deletions(-) diff --git a/extensions/aws2-kinesis/deployment/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/deployment/Aws2KinesisProcessor.java b/extensions/aws2-kinesis/deployment/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/deployment/Aws2KinesisProcessor.java index 7a55071918..1cadb5c61a 100644 --- a/extensions/aws2-kinesis/deployment/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/deployment/Aws2KinesisProcessor.java +++ b/extensions/aws2-kinesis/deployment/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/deployment/Aws2KinesisProcessor.java @@ -16,8 +16,13 @@ */ package org.apache.camel.quarkus.component.aws2.kinesis.deployment; +import java.util.stream.Stream; + +import io.quarkus.deployment.annotations.BuildProducer; import io.quarkus.deployment.annotations.BuildStep; import io.quarkus.deployment.builditem.FeatureBuildItem; +import io.quarkus.deployment.builditem.nativeimage.ReflectiveClassBuildItem; +import io.quarkus.deployment.builditem.nativeimage.RuntimeInitializedClassBuildItem; import org.jboss.logging.Logger; class Aws2KinesisProcessor { @@ -29,4 +34,17 @@ class Aws2KinesisProcessor { FeatureBuildItem feature() { return new FeatureBuildItem(FEATURE); } + + @BuildStep + void runtimeInitializedClasses(BuildProducer<RuntimeInitializedClassBuildItem> runtimeInitializedClass) { + Stream.of("software.amazon.awssdk.services.dynamodb.DynamoDbRetryPolicy", + "software.amazon.kinesis.lifecycle.ShutdownTask") + .map(RuntimeInitializedClassBuildItem::new) + .forEach(runtimeInitializedClass::produce); + } + + @BuildStep + void build(BuildProducer<ReflectiveClassBuildItem> reflectiveClass) { + reflectiveClass.produce(ReflectiveClassBuildItem.builder("javax.xml.bind.DatatypeConverter").build()); + } } diff --git a/extensions/aws2-kinesis/runtime/pom.xml b/extensions/aws2-kinesis/runtime/pom.xml index dd3411b192..fc96d75e32 100644 --- a/extensions/aws2-kinesis/runtime/pom.xml +++ b/extensions/aws2-kinesis/runtime/pom.xml @@ -54,6 +54,11 @@ <groupId>io.quarkus</groupId> <artifactId>quarkus-netty</artifactId> </dependency> + <dependency> + <groupId>org.graalvm.sdk</groupId> + <artifactId>nativeimage</artifactId> + <scope>provided</scope> + </dependency> </dependencies> <build> diff --git a/extensions/aws2-kinesis/runtime/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/graalvm/Aws2KinesisSubstitutions.java b/extensions/aws2-kinesis/runtime/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/graalvm/Aws2KinesisSubstitutions.java new file mode 100644 index 0000000000..4bfacfe4bb --- /dev/null +++ b/extensions/aws2-kinesis/runtime/src/main/java/org/apache/camel/quarkus/component/aws2/kinesis/graalvm/Aws2KinesisSubstitutions.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.quarkus.component.aws2.kinesis.graalvm; + +import java.io.UnsupportedEncodingException; +import java.math.BigInteger; +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.List; + +import com.google.protobuf.InvalidProtocolBufferException; +import com.oracle.svm.core.annotate.Alias; +import com.oracle.svm.core.annotate.Substitute; +import com.oracle.svm.core.annotate.TargetClass; +import software.amazon.kinesis.retrieval.AggregatorUtil; +import software.amazon.kinesis.retrieval.KinesisClientRecord; +import software.amazon.kinesis.retrieval.kpl.Messages; + +public class Aws2KinesisSubstitutions { +} + +/** + * Quick (ugly) fix of https://github.com/awslabs/amazon-kinesis-client/issues/1355 + */ +@TargetClass(AggregatorUtil.class) +final class AggregatorUtilSubstitutions { + + @Alias + protected byte[] calculateTailCheck(byte[] data) { + return null; + } + + @Alias + protected BigInteger effectiveHashKey(String partitionKey, String explicitHashKey) throws UnsupportedEncodingException { + return null; + } + + @Alias + public KinesisClientRecord convertRecordToKinesisClientRecord(final KinesisClientRecord record, + final boolean aggregated, + final long subSequenceNumber, + final String explicitHashKey) { + return null; + } + + @Substitute + public List<KinesisClientRecord> deaggregate(List<KinesisClientRecord> records, + BigInteger startingHashKey, + BigInteger endingHashKey) { + List<KinesisClientRecord> result = new ArrayList<>(); + byte[] magic = new byte[AggregatorUtil.AGGREGATED_RECORD_MAGIC.length]; + byte[] digest = new byte[16]; + + for (KinesisClientRecord r : records) { + boolean isAggregated = true; + long subSeqNum = 0; + ByteBuffer bb = r.data(); + + if (bb.remaining() >= magic.length) { + bb.get(magic); + } else { + isAggregated = false; + } + + if (!Arrays.equals(AggregatorUtil.AGGREGATED_RECORD_MAGIC, magic) || bb.remaining() <= 16) { + isAggregated = false; + } + + if (isAggregated) { + int oldLimit = bb.limit(); + bb.limit(oldLimit - 16); + byte[] messageData = new byte[bb.remaining()]; + bb.get(messageData); + bb.limit(oldLimit); + bb.get(digest); + byte[] calculatedDigest = calculateTailCheck(messageData); + + if (!Arrays.equals(digest, calculatedDigest)) { + isAggregated = false; + } else { + try { + Messages.AggregatedRecord ar = Messages.AggregatedRecord.parseFrom(messageData); + List<String> pks = ar.getPartitionKeyTableList(); + List<String> ehks = ar.getExplicitHashKeyTableList(); + long aat = r.approximateArrivalTimestamp() == null + ? -1 : r.approximateArrivalTimestamp().toEpochMilli(); + try { + int recordsInCurrRecord = 0; + for (Messages.Record mr : ar.getRecordsList()) { + String explicitHashKey = null; + String partitionKey = pks.get((int) mr.getPartitionKeyIndex()); + if (mr.hasExplicitHashKeyIndex()) { + explicitHashKey = ehks.get((int) mr.getExplicitHashKeyIndex()); + } + + BigInteger effectiveHashKey = effectiveHashKey(partitionKey, explicitHashKey); + + if (effectiveHashKey.compareTo(startingHashKey) < 0 + || effectiveHashKey.compareTo(endingHashKey) > 0) { + for (int toRemove = 0; toRemove < recordsInCurrRecord; ++toRemove) { + result.remove(result.size() - 1); + } + break; + } + + ++recordsInCurrRecord; + + KinesisClientRecord record = r.toBuilder() + .data(ByteBuffer.wrap(mr.getData().toByteArray())) + .partitionKey(partitionKey) + .explicitHashKey(explicitHashKey) + .build(); + result.add(convertRecordToKinesisClientRecord(record, true, subSeqNum++, explicitHashKey)); + } + } catch (Exception e) { + StringBuilder sb = new StringBuilder(); + sb.append("Unexpected exception during deaggregation, record was:\n"); + sb.append("PKS:\n"); + for (String s : pks) { + sb.append(s).append("\n"); + } + sb.append("EHKS: \n"); + for (String s : ehks) { + sb.append(s).append("\n"); + } + for (Messages.Record mr : ar.getRecordsList()) { + sb.append("Record: [hasEhk=").append(mr.hasExplicitHashKeyIndex()).append(", ") + .append("ehkIdx=").append(mr.getExplicitHashKeyIndex()).append(", ") + .append("pkIdx=").append(mr.getPartitionKeyIndex()).append(", ") + .append("dataLen=").append(mr.getData().toByteArray().length).append("]\n"); + } + sb.append("Sequence number: ").append(r.sequenceNumber()).append("\n") + .append("Raw data: ") + .append(jakarta.xml.bind.DatatypeConverter.printBase64Binary(messageData)).append("\n"); + // todo log.error(sb.toString(), e); + } + } catch (InvalidProtocolBufferException e) { + isAggregated = false; + } + } + } + + if (!isAggregated) { + bb.rewind(); + result.add(r); + } + } + return result; + } +} diff --git a/integration-test-groups/aws2/aws2-kinesis/src/main/resources/application.properties b/integration-test-groups/aws2/aws2-kinesis/src/main/resources/application.properties index 056ab69848..e3161d4f9b 100644 --- a/integration-test-groups/aws2/aws2-kinesis/src/main/resources/application.properties +++ b/integration-test-groups/aws2/aws2-kinesis/src/main/resources/application.properties @@ -24,3 +24,7 @@ camel.component.aws2-kinesis-firehose.access-key=${AWS_ACCESS_KEY} camel.component.aws2-kinesis-firehose.secret-key=${AWS_SECRET_KEY} camel.component.aws2-kinesis-firehose.useDefaultCredentialsProvider=${AWS_USE_DEFAULT_CREDENTIALS_PROVIDER} camel.component.aws2-kinesis-firehose.region=${AWS_REGION:us-east-1} + + +#quarkus.native.additional-build-args=--trace-object-instantiation=java.util.Random, --trace-object-instantiation=software.amazon.awssdk.core.retry.backoff.FullJitterBackoffStrategy, --initialize-at-run-time=software.amazon.kinesis.lifecycle.ShutdownTask, --initialize-at-run-time=software.amazon.awssdk.services.dynamodb.DynamoDbRetryPolicy +quarkus.native.additional-build-args=--initialize-at-build-time=javax.xml.bind.DatatypeConverter diff --git a/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseIT.java b/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseIT.java deleted file mode 100644 index 1347c84e3d..0000000000 --- a/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseIT.java +++ /dev/null @@ -1,24 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.quarkus.component.aws2.kinesis.it; - -import io.quarkus.test.junit.QuarkusIntegrationTest; - -@QuarkusIntegrationTest -class Aws2KinesisFirehoseIT extends Aws2KinesisFirehoseTest { - -} diff --git a/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseTest.java b/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseTest.java deleted file mode 100644 index b16c505952..0000000000 --- a/integration-test-groups/aws2/aws2-kinesis/src/test/java/org/apache/camel/quarkus/component/aws2/kinesis/it/Aws2KinesisFirehoseTest.java +++ /dev/null @@ -1,127 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.camel.quarkus.component.aws2.kinesis.it; - -import java.nio.charset.StandardCharsets; -import java.util.List; -import java.util.concurrent.TimeUnit; - -import io.quarkus.test.common.QuarkusTestResource; -import io.quarkus.test.junit.QuarkusTest; -import io.restassured.RestAssured; -import io.restassured.http.ContentType; -import org.apache.camel.quarkus.test.support.aws2.Aws2Client; -import org.apache.camel.quarkus.test.support.aws2.Aws2TestResource; -import org.apache.camel.quarkus.test.support.aws2.BaseAWs2TestSupport; -import org.apache.commons.lang3.RandomStringUtils; -import org.awaitility.Awaitility; -import org.eclipse.microprofile.config.Config; -import org.eclipse.microprofile.config.ConfigProvider; -import org.jboss.logging.Logger; -import org.junit.jupiter.api.Test; -import org.testcontainers.containers.localstack.LocalStackContainer.Service; -import software.amazon.awssdk.core.ResponseInputStream; -import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.GetObjectRequest; -import software.amazon.awssdk.services.s3.model.GetObjectResponse; -import software.amazon.awssdk.services.s3.model.ListObjectsRequest; -import software.amazon.awssdk.services.s3.model.ListObjectsResponse; -import software.amazon.awssdk.services.s3.model.S3Object; - -@QuarkusTest -@QuarkusTestResource(Aws2TestResource.class) -class Aws2KinesisFirehoseTest extends BaseAWs2TestSupport { - - private static final Logger LOG = Logger.getLogger(Aws2KinesisFirehoseTest.class); - - @Aws2Client(Service.S3) - S3Client client; - - public Aws2KinesisFirehoseTest() { - super("/aws2-kinesis-firehose"); - } - - @Test - public void firehose() { - final String msg = RandomStringUtils.randomAlphanumeric(32 * 1024); - final String msgPrefix = msg.substring(0, 32); - final long maxDataBytes = Aws2KinesisTestEnvCustomizer.BUFFERING_SIZE_MB * 1024 * 1024; - long bytesSent = 0; - LOG.info("Sending " + Aws2KinesisTestEnvCustomizer.BUFFERING_SIZE_MB + " MB of data to firehose using chunk " - + msgPrefix + "..."); - final long deadline = System.currentTimeMillis() + (Aws2KinesisTestEnvCustomizer.BUFFERING_TIME_SEC * 1000); - while (bytesSent < maxDataBytes && System.currentTimeMillis() < deadline) { - /* - * Send at least 1MB of data but do not spend more than a minute by doing it. - * This is to overpass minimum buffering limits we have set via BufferingHints in the EnvCustomizer - */ - RestAssured.given() // - .contentType(ContentType.TEXT) - .body(msg) - .post("/aws2-kinesis-firehose/send") // - .then() - .statusCode(201); - bytesSent += msg.length(); - LOG.info("Sent " + bytesSent + "/" + maxDataBytes + " bytes of data"); - } - LOG.info("Sent " + Aws2KinesisTestEnvCustomizer.BUFFERING_SIZE_MB + " MB of data to firehose"); - - final Config config = ConfigProvider.getConfig(); - - final String bucketName = config.getValue("aws-kinesis.s3-bucket-name", String.class); - LOG.infof("Bucket '%s' should contain objects.", bucketName); - Awaitility.await().pollInterval(1, TimeUnit.SECONDS).atMost(120, TimeUnit.SECONDS).until( - () -> { - LOG.infof("Reading objects from bucket '%s'", bucketName); - final ListObjectsResponse objects = client - .listObjects(ListObjectsRequest.builder().bucket(bucketName).build()); - final List<S3Object> objs = objects.contents(); - LOG.info("There are " + objs.size() + " objects in bucket " + bucketName); - for (S3Object obj : objs) { - LOG.info("Checking object " + obj.key() + " of size " + obj.size()); - try (ResponseInputStream<GetObjectResponse> o = client - .getObject(GetObjectRequest.builder().bucket(bucketName).key(obj.key()).build())) { - final StringBuilder sb = new StringBuilder(msg.length()); - final byte[] buf = new byte[1024]; - int len; - while ((len = o.read(buf)) >= 0 && sb.length() < msgPrefix.length()) { - sb.append(new String(buf, 0, len, StandardCharsets.UTF_8)); - } - final String foundContent = sb.toString(); - if (foundContent.startsWith(msgPrefix)) { - /* Yes, this is what we have sent */ - LOG.info("Found the expected content in object " + obj.key()); - return true; - } - } - } - return false; - }); - - } - - @Override - public void testMethodForDefaultCredentialsProvider() { - RestAssured.given() // - .contentType(ContentType.TEXT) - .body("test") - .post("/aws2-kinesis-firehose/send") // - .then() - .statusCode(201); - ; - } -}