This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit c13dd1f849925da51b0a1a92f30cc2611888cad3 Author: Mathieu <[email protected]> AuthorDate: Fri Jun 11 15:39:49 2021 +0200 Add RecordToJSONTransforms and JSONToRecordTransforms transforms --- connectors/camel-aws2-s3-kafka-connector/pom.xml | 5 ++ .../aws2s3/models/StorageHeader.java | 29 ++++++++ .../aws2s3/models/StorageRecord.java | 30 ++++++++ .../transformers/JSONToRecordTransforms.java | 72 +++++++++++++++++++ .../transformers/RecordToJSONTransforms.java | 82 ++++++++++++++++++++++ 5 files changed, 218 insertions(+) diff --git a/connectors/camel-aws2-s3-kafka-connector/pom.xml b/connectors/camel-aws2-s3-kafka-connector/pom.xml index 6650a70..d7318a8 100644 --- a/connectors/camel-aws2-s3-kafka-connector/pom.xml +++ b/connectors/camel-aws2-s3-kafka-connector/pom.xml @@ -54,6 +54,11 @@ <artifactId>camel-jackson</artifactId> </dependency> <!--END OF GENERATED CODE--> + <dependency> + <groupId>com.google.code.gson</groupId> + <artifactId>gson</artifactId> + <version>2.8.7</version> + </dependency> </dependencies> <build> <plugins> diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java new file mode 100644 index 0000000..c3526dc --- /dev/null +++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java @@ -0,0 +1,29 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.aws2s3.models; + + +public class StorageHeader { + public final String key; + public final String value; + + public StorageHeader(String key, String value) { + this.key = key; + this.value = value; + } +} + diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java new file mode 100644 index 0000000..3d74597 --- /dev/null +++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java @@ -0,0 +1,30 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.aws2s3.models; + +public class StorageRecord { + public final String key; + public final String body; + public final StorageHeader[] headers; + + public StorageRecord(String key, String body, StorageHeader[] headers) { + this.key = key; + this.body = body; + this.headers = headers; + } +} + diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java new file mode 100644 index 0000000..2d28425 --- /dev/null +++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java @@ -0,0 +1,72 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.aws2s3.transformers; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import java.util.Map; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.transforms.Transformation; + +public class JSONToRecordTransforms<R extends ConnectRecord<R>> implements Transformation<R> { + public static final String FIELD_KEY_CONFIG = "key"; + public static final ConfigDef CONFIG_DEF = + new ConfigDef() + .define( + FIELD_KEY_CONFIG, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.MEDIUM, + "Add the key and the header to the record value"); + + @Override + public void configure(Map<String, ?> configs) {} + + @Override + public R apply(R record) { + String str = new String((byte[]) record.value()); + GsonBuilder gsonBuilder = new GsonBuilder(); + Gson gson = gsonBuilder.create(); + StorageRecord storageRecord = gson.fromJson(str, StorageRecord.class); + // Header format conversion + Headers headers = new ConnectHeaders(); + for (int i = 0; i < storageRecord.headers.length; i++) { + headers.add(storageRecord.headers[i].key, storageRecord.headers[i].value, null); + } + headers.forEach(h -> record.headers().add(h)); + return record.newRecord( + record.topic(), + record.kafkaPartition(), + record.keySchema(), + storageRecord.key, + record.valueSchema(), + storageRecord.body, + record.timestamp(), + headers); + } + + @Override + public void close() {} + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } +} diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java new file mode 100644 index 0000000..fdb2e9f --- /dev/null +++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.camel.kafkaconnector.aws2s3.transformers; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import java.util.ArrayList; +import java.util.Map; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.transforms.Transformation; +import org.apache.camel.kafkaconnector.aws2s3.models.StorageHeader; +import org.apache.camel.kafkaconnector.aws2s3.models.StorageRecord; +import java.io.ByteArrayInputStream; + +public class RecordToJSONTransforms<R extends ConnectRecord<R>> implements Transformation<R> { + public static final String FIELD_KEY_CONFIG = "key"; + public static final ConfigDef CONFIG_DEF = + new ConfigDef() + .define( + FIELD_KEY_CONFIG, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.MEDIUM, + "Add the key and the header to the record value"); + + @Override + public void configure(Map<String, ?> configs) {} + + @Override + public R apply(R record) { + // Convert headers to StorageHeader format + Headers headers = record.headers(); + ArrayList<StorageHeader> headerList = new ArrayList<StorageHeader>(headers.size()); + for (Header h : headers) { + headerList.add(new StorageHeader(h.key(), (String) h.value())); + } + StorageHeader[] storageHeaders = new StorageHeader[headers.size()]; + StorageRecord storageRecord = + new StorageRecord( + (String) record.key(), (String) record.value(), headerList.toArray(storageHeaders)); + + // Serialize + GsonBuilder gsonBuilder = new GsonBuilder(); + Gson gson = gsonBuilder.create(); + String storageRecordJSON = gson.toJson(storageRecord, StorageRecord.class); + InputStream storageRecordStream = new ByteArrayInputStream(storageRecordJSON.getBytes()) + return record.newRecord( + record.topic(), + record.kafkaPartition(), + null, + record.key(), + Schema.STRING_SCHEMA, + storageRecordStream, + record.timestamp()); + } + + @Override + public void close() {} + + @Override + public ConfigDef config() { + return CONFIG_DEF; + } +}
