This is an automated email from the ASF dual-hosted git repository. johncasey pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 70fd788e2b0 Add schema conversion support from Kafka Connect Record schemas to Beam schemas (#24605) 70fd788e2b0 is described below commit 70fd788e2b0aae8b6dff56f61bc054ad7f0e5005 Author: Pablo Estrada <pabl...@users.noreply.github.com> AuthorDate: Fri Dec 9 10:30:28 2022 -0800 Add schema conversion support from Kafka Connect Record schemas to Beam schemas (#24605) --- sdks/java/io/debezium/build.gradle | 1 + .../apache/beam/io/debezium/KafkaConnectUtils.java | 77 ++++++++++++++++++++++ .../beam/io/debezium/KafkaConnectSchemaTest.java | 58 ++++++++++++++++ .../beam/io/debezium/SourceRecordJsonTest.java | 65 ++++++++++++------ 4 files changed, 180 insertions(+), 21 deletions(-) diff --git a/sdks/java/io/debezium/build.gradle b/sdks/java/io/debezium/build.gradle index a6a623511a5..bf5d0938287 100644 --- a/sdks/java/io/debezium/build.gradle +++ b/sdks/java/io/debezium/build.gradle @@ -46,6 +46,7 @@ dependencies { testRuntimeOnly library.java.slf4j_jdk14 testRuntimeOnly project(path: ":runners:direct-java", configuration: "shadow") testImplementation project(":runners:google-cloud-dataflow-java") + testImplementation library.java.hamcrest testImplementation library.java.testcontainers_base testImplementation library.java.testcontainers_mysql testImplementation library.java.testcontainers_postgresql diff --git a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java new file mode 100644 index 00000000000..abf5703bf20 --- /dev/null +++ b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java @@ -0,0 +1,77 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import org.apache.beam.sdk.schemas.Schema; + +public class KafkaConnectUtils { + public static Schema beamSchemaFromKafkaConnectSchema( + org.apache.kafka.connect.data.Schema kafkaSchema) { + assert kafkaSchema.type().equals(org.apache.kafka.connect.data.Schema.Type.STRUCT) + : "Beam Rows are encoded from Kafka Struct schemas."; + Schema.Builder beamSchemaBuilder = Schema.builder(); + + for (org.apache.kafka.connect.data.Field field : kafkaSchema.fields()) { + Schema.Field beamField = + field.schema().isOptional() + ? Schema.Field.nullable(field.name(), beamSchemaTypeFromKafkaType(field.schema())) + : Schema.Field.of(field.name(), beamSchemaTypeFromKafkaType(field.schema())); + if (field.schema().doc() != null) { + beamField = beamField.withDescription(field.schema().doc()); + } + beamSchemaBuilder.addField(beamField); + } + return beamSchemaBuilder.build(); + } + + public static Schema.FieldType beamSchemaTypeFromKafkaType( + org.apache.kafka.connect.data.Schema kafkaFieldSchema) { + switch (kafkaFieldSchema.type()) { + case STRUCT: + return Schema.FieldType.row(beamSchemaFromKafkaConnectSchema(kafkaFieldSchema)); + case INT8: + return Schema.FieldType.BYTE; + case INT16: + return Schema.FieldType.INT16; + case INT32: + return Schema.FieldType.INT32; + case INT64: + return Schema.FieldType.INT64; + case FLOAT32: + return Schema.FieldType.FLOAT; + case FLOAT64: + return Schema.FieldType.DOUBLE; + case BOOLEAN: + return Schema.FieldType.BOOLEAN; + case STRING: + return Schema.FieldType.STRING; + case BYTES: + return Schema.FieldType.BYTES; + case ARRAY: + return Schema.FieldType.array(beamSchemaTypeFromKafkaType(kafkaFieldSchema.valueSchema())); + case MAP: + return Schema.FieldType.map( + beamSchemaTypeFromKafkaType(kafkaFieldSchema.keySchema()), + beamSchemaTypeFromKafkaType(kafkaFieldSchema.valueSchema())); + default: + throw new IllegalArgumentException( + String.format( + "Unable to convert Kafka field schema %s to Beam Schema", kafkaFieldSchema)); + } + } +} diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaConnectSchemaTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaConnectSchemaTest.java new file mode 100644 index 00000000000..63b99ee2489 --- /dev/null +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaConnectSchemaTest.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.io.debezium; + +import static org.hamcrest.MatcherAssert.assertThat; + +import org.apache.beam.sdk.schemas.Schema; +import org.hamcrest.Matchers; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public class KafkaConnectSchemaTest { + + @Test + public void testSimpleSourceRecordSchemaConversion() { + org.apache.kafka.connect.data.Schema valueSchema = SourceRecordJsonTest.buildTableSchema(); + + Schema beamValueSchema = KafkaConnectUtils.beamSchemaFromKafkaConnectSchema(valueSchema); + assertThat( + beamValueSchema.getFields(), + Matchers.containsInAnyOrder( + Schema.Field.of("name", Schema.FieldType.STRING), + Schema.Field.of("age", Schema.FieldType.BYTE).withDescription("age of the person"), + Schema.Field.of("temperature", Schema.FieldType.FLOAT), + Schema.Field.of("distance", Schema.FieldType.DOUBLE), + Schema.Field.nullable("birthYear", Schema.FieldType.INT64), + Schema.Field.nullable( + "country", + Schema.FieldType.row( + Schema.of( + Schema.Field.of("name", Schema.FieldType.STRING), + Schema.Field.nullable("population", Schema.FieldType.INT64), + Schema.Field.nullable( + "latitude", Schema.FieldType.array(Schema.FieldType.FLOAT)), + Schema.Field.nullable( + "longitude", Schema.FieldType.array(Schema.FieldType.FLOAT))))), + Schema.Field.nullable( + "childrenAndAge", + Schema.FieldType.map(Schema.FieldType.STRING, Schema.FieldType.INT32)))); + } +} diff --git a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java index badd01eee29..add8844f48f 100644 --- a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java +++ b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java @@ -33,21 +33,22 @@ import org.junit.runners.JUnit4; public class SourceRecordJsonTest implements Serializable { @Test public void testSourceRecordJson() { - SourceRecord record = this.buildSourceRecord(); + SourceRecord record = buildSourceRecord(); SourceRecordJson json = new SourceRecordJson(record); String jsonString = json.toJson(); String expectedJson = "{\"metadata\":" - + "{\"connector\":\"test-connector\"," - + "\"version\":\"version-connector\"," + + "{\"connector\":\"test-connector\",\"version\":\"version-connector\"," + "\"name\":\"test-connector-sql\"," - + "\"database\":\"test-db\"," - + "\"schema\":\"test-schema\"," - + "\"table\":\"test-table\"}," - + "\"before\":{\"fields\":{\"column1\":\"before-name\"}}," - + "\"after\":{\"fields\":{\"column1\":\"after-name\"}}}"; + + "\"database\":\"test-db\",\"schema\":\"test-schema\",\"table\":\"test-table\"}," + + "\"before\":{\"fields\":{\"country\":null,\"distance\":123.423,\"birthYear\":null," + + "\"name\":\"before-name\"," + + "\"temperature\":104.4,\"childrenAndAge\":null,\"age\":16}}," + + "\"after\":{\"fields\":{\"country\":null,\"distance\":123.423,\"birthYear\":null," + + "\"name\":\"after-name\"," + + "\"temperature\":104.4,\"childrenAndAge\":null,\"age\":16}}}"; assertEquals(expectedJson, jsonString); } @@ -57,7 +58,7 @@ public class SourceRecordJsonTest implements Serializable { assertThrows(IllegalArgumentException.class, () -> new SourceRecordJson(null)); } - private Schema buildSourceSchema() { + private static Schema buildSourceSchema() { return SchemaBuilder.struct() .field("connector", Schema.STRING_SCHEMA) .field("version", Schema.STRING_SCHEMA) @@ -68,18 +69,32 @@ public class SourceRecordJsonTest implements Serializable { .build(); } - private Schema buildBeforeSchema() { - return SchemaBuilder.struct().field("column1", Schema.STRING_SCHEMA).build(); - } - - private Schema buildAfterSchema() { - return SchemaBuilder.struct().field("column1", Schema.STRING_SCHEMA).build(); + public static Schema buildTableSchema() { + return SchemaBuilder.struct() + .field("name", Schema.STRING_SCHEMA) + .field("age", SchemaBuilder.int8().doc("age of the person").build()) + .field("temperature", Schema.FLOAT32_SCHEMA) + .field("distance", Schema.FLOAT64_SCHEMA) + .field("birthYear", Schema.OPTIONAL_INT64_SCHEMA) + .field( + "country", + SchemaBuilder.struct() + .optional() + .field("name", Schema.STRING_SCHEMA) + .field("population", Schema.OPTIONAL_INT64_SCHEMA) + .field("latitude", SchemaBuilder.array(Schema.FLOAT32_SCHEMA).optional()) + .field("longitude", SchemaBuilder.array(Schema.FLOAT32_SCHEMA).optional()) + .build()) + .field( + "childrenAndAge", + SchemaBuilder.map(Schema.STRING_SCHEMA, Schema.INT32_SCHEMA).optional()) + .build(); } - private SourceRecord buildSourceRecord() { - final Schema sourceSchema = this.buildSourceSchema(); - final Schema beforeSchema = this.buildBeforeSchema(); - final Schema afterSchema = this.buildAfterSchema(); + static SourceRecord buildSourceRecord() { + final Schema sourceSchema = buildSourceSchema(); + final Schema beforeSchema = buildTableSchema(); + final Schema afterSchema = buildTableSchema(); final Schema schema = SchemaBuilder.struct() @@ -101,8 +116,16 @@ public class SourceRecordJsonTest implements Serializable { source.put("schema", "test-schema"); source.put("table", "test-table"); - before.put("column1", "before-name"); - after.put("column1", "after-name"); + before + .put("name", "before-name") + .put("age", (byte) 16) + .put("temperature", (float) 104.4) + .put("distance", 123.423); + after + .put("name", "after-name") + .put("age", (byte) 16) + .put("temperature", (float) 104.4) + .put("distance", 123.423); value.put("source", source); value.put("before", before);