This is an automated email from the ASF dual-hosted git repository. acosentino pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/camel-kafka-connector.git
commit 67a3f42b08c617f3a0e0e20abc86f933d88ba254 Author: Andrea Cosentino <[email protected]> AuthorDate: Thu Jul 22 08:47:29 2021 +0200 Camel-AWS2-S3: Fixed codestyle --- .../aws2s3/models/StorageHeader.java | 13 ++- .../aws2s3/models/StorageRecord.java | 16 ++-- .../transformers/JSONToRecordTransforms.java | 82 +++++++++-------- .../transformers/RecordToJSONTransforms.java | 101 +++++++++++---------- 4 files changed, 108 insertions(+), 104 deletions(-) diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java index c3526dc..3d22841 100644 --- a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java +++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageHeader.java @@ -16,14 +16,13 @@ */ package org.apache.camel.kafkaconnector.aws2s3.models; - public class StorageHeader { - public final String key; - public final String value; + public final String key; + public final String value; - public StorageHeader(String key, String value) { - this.key = key; - this.value = value; - } + public StorageHeader(String key, String value) { + this.key = key; + this.value = value; + } } diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java index 3d74597..9af3482 100644 --- a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java +++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/models/StorageRecord.java @@ -17,14 +17,14 @@ package org.apache.camel.kafkaconnector.aws2s3.models; public class StorageRecord { - public final String key; - public final String body; - public final StorageHeader[] headers; + public final String key; + public final String body; + public final StorageHeader[] headers; - public StorageRecord(String key, String body, StorageHeader[] headers) { - this.key = key; - this.body = body; - this.headers = headers; - } + public StorageRecord(String key, String body, StorageHeader[] headers) { + this.key = key; + this.body = body; + this.headers = headers; + } } diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java index 8ef4c53..882f224 100644 --- a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java +++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/JSONToRecordTransforms.java @@ -16,10 +16,10 @@ */ package org.apache.camel.kafkaconnector.aws2s3.transformers; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; import java.util.Map; +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; import org.apache.camel.kafkaconnector.aws2s3.models.StorageRecord; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; @@ -28,47 +28,49 @@ import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.transforms.Transformation; public class JSONToRecordTransforms<R extends ConnectRecord<R>> implements Transformation<R> { - public static final String FIELD_KEY_CONFIG = "key"; - public static final ConfigDef CONFIG_DEF = - new ConfigDef() - .define( - FIELD_KEY_CONFIG, - ConfigDef.Type.STRING, - null, - ConfigDef.Importance.MEDIUM, - "Add the key and the header to the record value"); + public static final String FIELD_KEY_CONFIG = "key"; + public static final ConfigDef CONFIG_DEF = + new ConfigDef() + .define( + FIELD_KEY_CONFIG, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.MEDIUM, + "Add the key and the header to the record value"); - @Override - public void configure(Map<String, ?> configs) {} + @Override + public void configure(Map<String, ?> configs) { + } - @Override - public R apply(R record) { - String str = new String((byte[]) record.value()); - GsonBuilder gsonBuilder = new GsonBuilder(); - Gson gson = gsonBuilder.create(); - StorageRecord storageRecord = gson.fromJson(str, StorageRecord.class); - // Header format conversion - Headers headers = new ConnectHeaders(); - for (int i = 0; i < storageRecord.headers.length; i++) { - headers.add(storageRecord.headers[i].key, storageRecord.headers[i].value, null); + @Override + public R apply(R record) { + String str = new String((byte[]) record.value()); + GsonBuilder gsonBuilder = new GsonBuilder(); + Gson gson = gsonBuilder.create(); + StorageRecord storageRecord = gson.fromJson(str, StorageRecord.class); + // Header format conversion + Headers headers = new ConnectHeaders(); + for (int i = 0; i < storageRecord.headers.length; i++) { + headers.add(storageRecord.headers[i].key, storageRecord.headers[i].value, null); + } + headers.forEach(h -> record.headers().add(h)); + return record.newRecord( + record.topic(), + record.kafkaPartition(), + record.keySchema(), + storageRecord.key, + record.valueSchema(), + storageRecord.body, + record.timestamp(), + headers); } - headers.forEach(h -> record.headers().add(h)); - return record.newRecord( - record.topic(), - record.kafkaPartition(), - record.keySchema(), - storageRecord.key, - record.valueSchema(), - storageRecord.body, - record.timestamp(), - headers); - } - @Override - public void close() {} + @Override + public void close() { + } - @Override - public ConfigDef config() { - return CONFIG_DEF; - } + @Override + public ConfigDef config() { + return CONFIG_DEF; + } } diff --git a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java index 203ab14..f97e15b 100644 --- a/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java +++ b/connectors/camel-aws2-s3-kafka-connector/src/main/java/org/apache/camel/kafkaconnector/aws2s3/transformers/RecordToJSONTransforms.java @@ -16,68 +16,71 @@ */ package org.apache.camel.kafkaconnector.aws2s3.transformers; -import com.google.gson.Gson; -import com.google.gson.GsonBuilder; +import java.io.ByteArrayInputStream; +import java.io.InputStream; import java.util.ArrayList; import java.util.Map; + +import com.google.gson.Gson; +import com.google.gson.GsonBuilder; +import org.apache.camel.kafkaconnector.aws2s3.models.StorageHeader; +import org.apache.camel.kafkaconnector.aws2s3.models.StorageRecord; import org.apache.kafka.common.config.ConfigDef; import org.apache.kafka.connect.connector.ConnectRecord; import org.apache.kafka.connect.data.Schema; import org.apache.kafka.connect.header.Header; import org.apache.kafka.connect.header.Headers; import org.apache.kafka.connect.transforms.Transformation; -import org.apache.camel.kafkaconnector.aws2s3.models.StorageHeader; -import org.apache.camel.kafkaconnector.aws2s3.models.StorageRecord; -import java.io.ByteArrayInputStream; -import java.io.InputStream; public class RecordToJSONTransforms<R extends ConnectRecord<R>> implements Transformation<R> { - public static final String FIELD_KEY_CONFIG = "key"; - public static final ConfigDef CONFIG_DEF = - new ConfigDef() - .define( - FIELD_KEY_CONFIG, - ConfigDef.Type.STRING, - null, - ConfigDef.Importance.MEDIUM, - "Add the key and the header to the record value"); - - @Override - public void configure(Map<String, ?> configs) {} + public static final String FIELD_KEY_CONFIG = "key"; + public static final ConfigDef CONFIG_DEF = + new ConfigDef() + .define( + FIELD_KEY_CONFIG, + ConfigDef.Type.STRING, + null, + ConfigDef.Importance.MEDIUM, + "Add the key and the header to the record value"); - @Override - public R apply(R record) { - // Convert headers to StorageHeader format - Headers headers = record.headers(); - ArrayList<StorageHeader> headerList = new ArrayList<StorageHeader>(headers.size()); - for (Header h : headers) { - headerList.add(new StorageHeader(h.key(), (String) h.value())); + @Override + public void configure(Map<String, ?> configs) { } - StorageHeader[] storageHeaders = new StorageHeader[headers.size()]; - StorageRecord storageRecord = - new StorageRecord( - (String) record.key(), (String) record.value(), headerList.toArray(storageHeaders)); - // Serialize - GsonBuilder gsonBuilder = new GsonBuilder(); - Gson gson = gsonBuilder.create(); - String storageRecordJSON = gson.toJson(storageRecord, StorageRecord.class); - InputStream storageRecordStream = new ByteArrayInputStream(storageRecordJSON.getBytes()); - return record.newRecord( - record.topic(), - record.kafkaPartition(), - null, - record.key(), - Schema.STRING_SCHEMA, - storageRecordStream, - record.timestamp()); - } + @Override + public R apply(R record) { + // Convert headers to StorageHeader format + Headers headers = record.headers(); + ArrayList<StorageHeader> headerList = new ArrayList<StorageHeader>(headers.size()); + for (Header h : headers) { + headerList.add(new StorageHeader(h.key(), (String) h.value())); + } + StorageHeader[] storageHeaders = new StorageHeader[headers.size()]; + StorageRecord storageRecord = + new StorageRecord( + (String) record.key(), (String) record.value(), headerList.toArray(storageHeaders)); - @Override - public void close() {} + // Serialize + GsonBuilder gsonBuilder = new GsonBuilder(); + Gson gson = gsonBuilder.create(); + String storageRecordJSON = gson.toJson(storageRecord, StorageRecord.class); + InputStream storageRecordStream = new ByteArrayInputStream(storageRecordJSON.getBytes()); + return record.newRecord( + record.topic(), + record.kafkaPartition(), + null, + record.key(), + Schema.STRING_SCHEMA, + storageRecordStream, + record.timestamp()); + } + + @Override + public void close() { + } - @Override - public ConfigDef config() { - return CONFIG_DEF; - } + @Override + public ConfigDef config() { + return CONFIG_DEF; + } }
