[GitHub] [kafka] C0urante commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

2020-11-13 Thread GitBox


C0urante commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r523161370



##
File path: 
connect/transforms/src/test/java/org/apache/kafka/connect/transforms/HeaderFromTest.java
##
@@ -0,0 +1,357 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaAndValue;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.ConnectHeaders;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.source.SourceRecord;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Arrays.asList;
+import static java.util.Collections.singletonList;
+import static java.util.Collections.singletonMap;
+import static org.apache.kafka.connect.data.Schema.STRING_SCHEMA;
+import static org.junit.Assert.assertEquals;
+
+@RunWith(Parameterized.class)
+public class HeaderFromTest {
+
+private final boolean keyTransform;
+
+static class RecordBuilder {
+private final List fields = new ArrayList<>(2);
+private final List fieldSchemas = new ArrayList<>(2);
+private final List fieldValues = new ArrayList<>(2);
+private final ConnectHeaders headers = new ConnectHeaders();
+
+public RecordBuilder() {
+}
+
+public RecordBuilder withField(String name, Schema schema, Object 
value) {
+fields.add(name);
+fieldSchemas.add(schema);
+fieldValues.add(value);
+return this;
+}
+
+public RecordBuilder addHeader(String name, Schema schema, Object 
value) {
+headers.add(name, new SchemaAndValue(schema, value));
+return this;
+}
+
+public SourceRecord schemaless(boolean keyTransform) {
+Map map = new HashMap<>();
+for (int i = 0; i < this.fields.size(); i++) {
+String fieldName = this.fields.get(i);
+map.put(fieldName, this.fieldValues.get(i));
+
+}
+return sourceRecord(keyTransform, null, map);
+}
+
+private Schema schema() {
+SchemaBuilder schemaBuilder = new 
SchemaBuilder(Schema.Type.STRUCT);
+for (int i = 0; i < this.fields.size(); i++) {
+String fieldName = this.fields.get(i);
+schemaBuilder.field(fieldName, this.fieldSchemas.get(i));
+
+}
+return schemaBuilder.build();
+}
+
+private Struct struct(Schema schema) {
+Struct struct = new Struct(schema);
+for (int i = 0; i < this.fields.size(); i++) {
+String fieldName = this.fields.get(i);
+struct.put(fieldName, this.fieldValues.get(i));
+}
+return struct;
+}
+
+public SourceRecord withSchema(boolean keyTransform) {
+Schema schema = schema();
+Struct struct = struct(schema);
+return sourceRecord(keyTransform, schema, struct);
+}
+
+private SourceRecord sourceRecord(boolean keyTransform, Schema 
keyOrValueSchema, Object keyOrValue) {
+Map sourcePartition = singletonMap("foo", "bar");
+Map sourceOffset = singletonMap("baz", "quxx");
+String topic = "topic";
+Integer partition = 0;
+Long timestamp = 0L;
+
+ConnectHeaders headers = this.headers;
+if (keyOrValueSchema == null) {
+// When doing a schemaless transformation we don't expect the 
header to have a schema
+headers = new ConnectHeaders();
+for (Header header : this.headers) {
+headers.add(header.key(), new SchemaAndValue(null, 
header.value(

[GitHub] [kafka] C0urante commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

2020-11-13 Thread GitBox


C0urante commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r523147909



##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom> implements 
Transformation {
+
+public static final String FIELDS_FIELD = "fields";
+public static final String HEADERS_FIELD = "headers";
+public static final String OPERATION_FIELD = "operation";
+
+public static final String OVERVIEW_DOC =
+"Moves or copies fields in the key/value of a record into that 
record's headers. " +
+"Corresponding elements of " + FIELDS_FIELD + 
" and " +
+"" + HEADERS_FIELD + " together identify a 
field and the header it should be " +
+"moved or copied to. " +
+"Use the concrete transformation type designed for the 
record " +
+"key (" + Key.class.getName() + ") or value 
(" + Value.class.getName() + ").";
+
+public static final ConfigDef CONFIG_DEF = new ConfigDef()
+.define(FIELDS_FIELD, ConfigDef.Type.LIST, 
ConfigDef.Importance.HIGH,
+"Field names in the record whose values are to be copied 
or moved to headers.")
+.define(HEADERS_FIELD, ConfigDef.Type.LIST, 
ConfigDef.Importance.HIGH,
+"Header names, in the same order as the field names listed 
in the fields configuration property.")
+.define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+ConfigDef.ValidString.in("move", "copy"), 
ConfigDef.Importance.HIGH,
+"Either move if the fields are to be moved to 
the headers (removed from the key/value), " +
+"or copy if the fields are to be 
copied to the headers (retained in the key/value).");
+
+enum Operation {
+MOVE("move"),
+COPY("copy");
+
+private final String name;
+
+Operation(String name) {
+this.name = name;
+}
+
+static Operation fromName(String name) {
+switch (name) {
+case "move":
+return MOVE;
+case "copy":
+return COPY;
+default:
+throw new IllegalArgumentException();
+}
+}
+
+public String toString() {
+return name;
+}
+}
+
+private List fields;
+
+private List headers;
+
+private Operation operation;
+
+@Override
+public R apply(R record) {
+Object operatingValue = operatingValue(record);
+Schema operatingSchema = operatingSchema(record);
+
+if (operatingSchema == null) {
+return applySchemaless(record, operatingValue);
+} else {
+return applyWithSchema(record, operatingValue, operatingSchema);
+}
+}
+
+private R applyWithSchema(R record, Object operatingValue, Schema 
operatingSchema) {
+Headers updatedHeaders = record.headers().duplicate();

Review comment:
   Ah, that's fair. Does seem to be the pattern followed by the other 
out-of-the-box transformations as well; probably best to continue to follow 
that pattern.
   
   I

[GitHub] [kafka] C0urante commented on a change in pull request #9549: KIP-145: Add SMTs, HeaderFrom, DropHeaders and InsertHeader

2020-11-09 Thread GitBox


C0urante commented on a change in pull request #9549:
URL: https://github.com/apache/kafka/pull/9549#discussion_r520056667



##
File path: 
connect/transforms/src/main/java/org/apache/kafka/connect/transforms/HeaderFrom.java
##
@@ -0,0 +1,219 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.connect.transforms;
+
+import org.apache.kafka.common.config.ConfigDef;
+import org.apache.kafka.common.config.ConfigException;
+import org.apache.kafka.connect.connector.ConnectRecord;
+import org.apache.kafka.connect.data.Field;
+import org.apache.kafka.connect.data.Schema;
+import org.apache.kafka.connect.data.SchemaBuilder;
+import org.apache.kafka.connect.data.Struct;
+import org.apache.kafka.connect.header.Header;
+import org.apache.kafka.connect.header.Headers;
+import org.apache.kafka.connect.transforms.util.Requirements;
+import org.apache.kafka.connect.transforms.util.SchemaUtil;
+import org.apache.kafka.connect.transforms.util.SimpleConfig;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.apache.kafka.common.config.ConfigDef.NO_DEFAULT_VALUE;
+
+public abstract class HeaderFrom> implements 
Transformation {
+
+public static final String FIELDS_FIELD = "fields";
+public static final String HEADERS_FIELD = "headers";
+public static final String OPERATION_FIELD = "operation";
+
+public static final String OVERVIEW_DOC =
+"Moves or copies fields in the key/value of a record into that 
record's headers. " +
+"Corresponding elements of " + FIELDS_FIELD + 
" and " +
+"" + HEADERS_FIELD + " together identify a 
field and the header it should be " +
+"moved or copied to. " +
+"Use the concrete transformation type designed for the 
record " +
+"key (" + Key.class.getName() + ") or value 
(" + Value.class.getName() + ").";
+
+public static final ConfigDef CONFIG_DEF = new ConfigDef()
+.define(FIELDS_FIELD, ConfigDef.Type.LIST, 
ConfigDef.Importance.HIGH,
+"Field names in the record whose values are to be copied 
or moved to headers.")
+.define(HEADERS_FIELD, ConfigDef.Type.LIST, 
ConfigDef.Importance.HIGH,
+"Header names, in the same order as the field names listed 
in the fields configuration property.")
+.define(OPERATION_FIELD, ConfigDef.Type.STRING, NO_DEFAULT_VALUE,
+ConfigDef.ValidString.in("move", "copy"), 
ConfigDef.Importance.HIGH,
+"Either move if the fields are to be moved to 
the headers (removed from the key/value), " +
+"or copy if the fields are to be 
copied to the headers (retained in the key/value).");
+
+enum Operation {
+MOVE("move"),
+COPY("copy");
+
+private final String name;
+
+Operation(String name) {
+this.name = name;
+}
+
+static Operation fromName(String name) {
+switch (name) {
+case "move":
+return MOVE;
+case "copy":
+return COPY;
+default:
+throw new IllegalArgumentException();
+}
+}
+
+public String toString() {
+return name;
+}
+}
+
+private List fields;
+
+private List headers;
+
+private Operation operation;
+
+@Override
+public R apply(R record) {
+Object operatingValue = operatingValue(record);
+Schema operatingSchema = operatingSchema(record);
+
+if (operatingSchema == null) {
+return applySchemaless(record, operatingValue);
+} else {
+return applyWithSchema(record, operatingValue, operatingSchema);
+}
+}
+
+private R applyWithSchema(R record, Object operatingValue, Schema 
operatingSchema) {
+Headers updatedHeaders = record.headers().duplicate();

Review comment:
   Why duplicate headers here? According to the [Header class's 
Javadocs](https://kafka.apache.org/26/javadoc/org/apache/kafka/connect/header/Headers.html),
 the collect