[GitHub] [kafka] C0urante commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader
C0urante commented on a change in pull request #9549: URL: https://github.com/apache/kafka/pull/9549#discussion_r523161370 ## File path: connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java ## @@ -0,0 +1,357 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaAndValue; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.ConnectHeaders; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.source.SourceRecord; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static java.util.Arrays.asList; +import static java.util.Collections.singletonList; +import static java.util.Collections.singletonMap; +import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA; +import static org.junit.Assert.assertEquals; + +@RunWith(Parameterized.class) +public class HeaderFromTest { + +private final boolean keyTransform; + +static class RecordBuilder { +private final List fields = new ArrayList<>(2); +private final List fieldSchemas = new ArrayList<>(2); +private final List fieldValues = new ArrayList<>(2); +private final ConnectHeaders headers = new ConnectHeaders(); + +public RecordBuilder() { +} + +public RecordBuilder withField(String name, Schema schema, Object value) { +fields.add(name); +fieldSchemas.add(schema); +fieldValues.add(value); +return this; +} + +public RecordBuilder addHeader(String name, Schema schema, Object value) { +headers.add(name, new SchemaAndValue(schema, value)); +return this; +} + +public SourceRecord schemaless(boolean keyTransform) { +Map map = new HashMap<>(); +for (int i = 0; i < this.fields.size(); i++) { +String fieldName = this.fields.get(i); +map.put(fieldName, this.fieldValues.get(i)); + +} +return sourceRecord(keyTransform, null, map); +} + +private Schema schema() { +SchemaBuilder schemaBuilder = new SchemaBuilder(Schema.Type.STRUCT); +for (int i = 0; i < this.fields.size(); i++) { +String fieldName = this.fields.get(i); +schemaBuilder.field(fieldName, this.fieldSchemas.get(i)); + +} +return schemaBuilder.build(); +} + +private Struct struct(Schema schema) { +Struct struct = new Struct(schema); +for (int i = 0; i < this.fields.size(); i++) { +String fieldName = this.fields.get(i); +struct.put(fieldName, this.fieldValues.get(i)); +} +return struct; +} + +public SourceRecord withSchema(boolean keyTransform) { +Schema schema = schema(); +Struct struct = struct(schema); +return sourceRecord(keyTransform, schema, struct); +} + +private SourceRecord sourceRecord(boolean keyTransform, Schema keyOrValueSchema, Object keyOrValue) { +Map sourcePartition = singletonMap("foo", "bar"); +Map sourceOffset = singletonMap("baz", "quxx"); +String topic = "topic"; +Integer partition = 0; +Long timestamp = 0L; + +ConnectHeaders headers = this.headers; +if (keyOrValueSchema == null) { +// When doing a schemaless transformation we don't expect the header to have a schema +headers = new ConnectHeaders(); +for (Header header : this.headers) { +headers.add(header.key(), new SchemaAndValue(null, header.value(
[GitHub] [kafka] C0urante commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader
C0urante commented on a change in pull request #9549: URL: https://github.com/apache/kafka/pull/9549#discussion_r523147909 ## File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.transforms.util.Requirements; +import org.apache.kafka.connect.transforms.util.SchemaUtil; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; + +public abstract class HeaderFrom> implements Transformation { + +public static final String FIELDS_FIELD = "fields"; +public static final String HEADERS_FIELD = "headers"; +public static final String OPERATION_FIELD = "operation"; + +public static final String OVERVIEW_DOC = +"Moves or copies fields in the key/value of a record into that record's headers. " + +"Corresponding elements of " + FIELDS_FIELD + " and " + +"" + HEADERS_FIELD + " together identify a field and the header it should be " + +"moved or copied to. " + +"Use the concrete transformation type designed for the record " + +"key (" + Key.class.getName() + ") or value (" + Value.class.getName() + ")."; + +public static final ConfigDef CONFIG_DEF = new ConfigDef() +.define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, +"Field names in the record whose values are to be copied or moved to headers.") +.define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, +"Header names, in the same order as the field names listed in the fields configuration property.") +.define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE, +ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH, +"Either move if the fields are to be moved to the headers (removed from the key/value), " + +"or copy if the fields are to be copied to the headers (retained in the key/value)."); + +enum Operation { +MOVE("move"), +COPY("copy"); + +private final String name; + +Operation(String name) { +this.name = name; +} + +static Operation fromName(String name) { +switch (name) { +case "move": +return MOVE; +case "copy": +return COPY; +default: +throw new IllegalArgumentException(); +} +} + +public String toString() { +return name; +} +} + +private List fields; + +private List headers; + +private Operation operation; + +@Override +public R apply(R record) { +Object operatingValue = operatingValue(record); +Schema operatingSchema = operatingSchema(record); + +if (operatingSchema == null) { +return applySchemaless(record, operatingValue); +} else { +return applyWithSchema(record, operatingValue, operatingSchema); +} +} + +private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) { +Headers updatedHeaders = record.headers().duplicate(); Review comment: Ah, that's fair. Does seem to be the pattern followed by the other out-of-the-box transformations as well; probably best to continue to follow that pattern. I
[GitHub] [kafka] C0urante commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader
C0urante commented on a change in pull request #9549: URL: https://github.com/apache/kafka/pull/9549#discussion_r520056667 ## File path: connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java ## @@ -0,0 +1,219 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + *http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.connect.transforms; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigException; +import org.apache.kafka.connect.connector.ConnectRecord; +import org.apache.kafka.connect.data.Field; +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.data.SchemaBuilder; +import org.apache.kafka.connect.data.Struct; +import org.apache.kafka.connect.header.Header; +import org.apache.kafka.connect.header.Headers; +import org.apache.kafka.connect.transforms.util.Requirements; +import org.apache.kafka.connect.transforms.util.SchemaUtil; +import org.apache.kafka.connect.transforms.util.SimpleConfig; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE; + +public abstract class HeaderFrom> implements Transformation { + +public static final String FIELDS_FIELD = "fields"; +public static final String HEADERS_FIELD = "headers"; +public static final String OPERATION_FIELD = "operation"; + +public static final String OVERVIEW_DOC = +"Moves or copies fields in the key/value of a record into that record's headers. " + +"Corresponding elements of " + FIELDS_FIELD + " and " + +"" + HEADERS_FIELD + " together identify a field and the header it should be " + +"moved or copied to. " + +"Use the concrete transformation type designed for the record " + +"key (" + Key.class.getName() + ") or value (" + Value.class.getName() + ")."; + +public static final ConfigDef CONFIG_DEF = new ConfigDef() +.define(FIELDS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, +"Field names in the record whose values are to be copied or moved to headers.") +.define(HEADERS_FIELD, ConfigDef.Type.LIST, ConfigDef.Importance.HIGH, +"Header names, in the same order as the field names listed in the fields configuration property.") +.define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE, +ConfigDef.ValidString.in("move", "copy"), ConfigDef.Importance.HIGH, +"Either move if the fields are to be moved to the headers (removed from the key/value), " + +"or copy if the fields are to be copied to the headers (retained in the key/value)."); + +enum Operation { +MOVE("move"), +COPY("copy"); + +private final String name; + +Operation(String name) { +this.name = name; +} + +static Operation fromName(String name) { +switch (name) { +case "move": +return MOVE; +case "copy": +return COPY; +default: +throw new IllegalArgumentException(); +} +} + +public String toString() { +return name; +} +} + +private List fields; + +private List headers; + +private Operation operation; + +@Override +public R apply(R record) { +Object operatingValue = operatingValue(record); +Schema operatingSchema = operatingSchema(record); + +if (operatingSchema == null) { +return applySchemaless(record, operatingValue); +} else { +return applyWithSchema(record, operatingValue, operatingSchema); +} +} + +private R applyWithSchema(R record, Object operatingValue, Schema operatingSchema) { +Headers updatedHeaders = record.headers().duplicate(); Review comment: Why duplicate headers here? According to the [Header class's Javadocs](https://kafka.apache.org/26/javadoc/org/apache/kafka/connect/header/Headers.html), the collect