This is an automated email from the ASF dual-hosted git repository.

johncasey pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 70fd788e2b0 Add schema conversion support from Kafka Connect Record 
schemas to Beam schemas (#24605)
70fd788e2b0 is described below

commit 70fd788e2b0aae8b6dff56f61bc054ad7f0e5005
Author: Pablo Estrada <pabl...@users.noreply.github.com>
AuthorDate: Fri Dec 9 10:30:28 2022 -0800

    Add schema conversion support from Kafka Connect Record schemas to Beam 
schemas (#24605)
---
 sdks/java/io/debezium/build.gradle                 |  1 +
 .../apache/beam/io/debezium/KafkaConnectUtils.java | 77 ++++++++++++++++++++++
 .../beam/io/debezium/KafkaConnectSchemaTest.java   | 58 ++++++++++++++++
 .../beam/io/debezium/SourceRecordJsonTest.java     | 65 ++++++++++++------
 4 files changed, 180 insertions(+), 21 deletions(-)

diff --git a/sdks/java/io/debezium/build.gradle 
b/sdks/java/io/debezium/build.gradle
index a6a623511a5..bf5d0938287 100644
--- a/sdks/java/io/debezium/build.gradle
+++ b/sdks/java/io/debezium/build.gradle
@@ -46,6 +46,7 @@ dependencies {
     testRuntimeOnly library.java.slf4j_jdk14
     testRuntimeOnly project(path: ":runners:direct-java", configuration: 
"shadow")
     testImplementation project(":runners:google-cloud-dataflow-java")
+    testImplementation library.java.hamcrest
     testImplementation library.java.testcontainers_base
     testImplementation library.java.testcontainers_mysql
     testImplementation library.java.testcontainers_postgresql
diff --git 
a/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java
 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java
new file mode 100644
index 00000000000..abf5703bf20
--- /dev/null
+++ 
b/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaConnectUtils.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import org.apache.beam.sdk.schemas.Schema;
+
+public class KafkaConnectUtils {
+  public static Schema beamSchemaFromKafkaConnectSchema(
+      org.apache.kafka.connect.data.Schema kafkaSchema) {
+    assert 
kafkaSchema.type().equals(org.apache.kafka.connect.data.Schema.Type.STRUCT)
+        : "Beam Rows are encoded from Kafka Struct schemas.";
+    Schema.Builder beamSchemaBuilder = Schema.builder();
+
+    for (org.apache.kafka.connect.data.Field field : kafkaSchema.fields()) {
+      Schema.Field beamField =
+          field.schema().isOptional()
+              ? Schema.Field.nullable(field.name(), 
beamSchemaTypeFromKafkaType(field.schema()))
+              : Schema.Field.of(field.name(), 
beamSchemaTypeFromKafkaType(field.schema()));
+      if (field.schema().doc() != null) {
+        beamField = beamField.withDescription(field.schema().doc());
+      }
+      beamSchemaBuilder.addField(beamField);
+    }
+    return beamSchemaBuilder.build();
+  }
+
+  public static Schema.FieldType beamSchemaTypeFromKafkaType(
+      org.apache.kafka.connect.data.Schema kafkaFieldSchema) {
+    switch (kafkaFieldSchema.type()) {
+      case STRUCT:
+        return 
Schema.FieldType.row(beamSchemaFromKafkaConnectSchema(kafkaFieldSchema));
+      case INT8:
+        return Schema.FieldType.BYTE;
+      case INT16:
+        return Schema.FieldType.INT16;
+      case INT32:
+        return Schema.FieldType.INT32;
+      case INT64:
+        return Schema.FieldType.INT64;
+      case FLOAT32:
+        return Schema.FieldType.FLOAT;
+      case FLOAT64:
+        return Schema.FieldType.DOUBLE;
+      case BOOLEAN:
+        return Schema.FieldType.BOOLEAN;
+      case STRING:
+        return Schema.FieldType.STRING;
+      case BYTES:
+        return Schema.FieldType.BYTES;
+      case ARRAY:
+        return 
Schema.FieldType.array(beamSchemaTypeFromKafkaType(kafkaFieldSchema.valueSchema()));
+      case MAP:
+        return Schema.FieldType.map(
+            beamSchemaTypeFromKafkaType(kafkaFieldSchema.keySchema()),
+            beamSchemaTypeFromKafkaType(kafkaFieldSchema.valueSchema()));
+      default:
+        throw new IllegalArgumentException(
+            String.format(
+                "Unable to convert Kafka field schema %s to Beam Schema", 
kafkaFieldSchema));
+    }
+  }
+}
diff --git 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaConnectSchemaTest.java
 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaConnectSchemaTest.java
new file mode 100644
index 00000000000..63b99ee2489
--- /dev/null
+++ 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/KafkaConnectSchemaTest.java
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.io.debezium;
+
+import static org.hamcrest.MatcherAssert.assertThat;
+
+import org.apache.beam.sdk.schemas.Schema;
+import org.hamcrest.Matchers;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public class KafkaConnectSchemaTest {
+
+  @Test
+  public void testSimpleSourceRecordSchemaConversion() {
+    org.apache.kafka.connect.data.Schema valueSchema = 
SourceRecordJsonTest.buildTableSchema();
+
+    Schema beamValueSchema = 
KafkaConnectUtils.beamSchemaFromKafkaConnectSchema(valueSchema);
+    assertThat(
+        beamValueSchema.getFields(),
+        Matchers.containsInAnyOrder(
+            Schema.Field.of("name", Schema.FieldType.STRING),
+            Schema.Field.of("age", Schema.FieldType.BYTE).withDescription("age 
of the person"),
+            Schema.Field.of("temperature", Schema.FieldType.FLOAT),
+            Schema.Field.of("distance", Schema.FieldType.DOUBLE),
+            Schema.Field.nullable("birthYear", Schema.FieldType.INT64),
+            Schema.Field.nullable(
+                "country",
+                Schema.FieldType.row(
+                    Schema.of(
+                        Schema.Field.of("name", Schema.FieldType.STRING),
+                        Schema.Field.nullable("population", 
Schema.FieldType.INT64),
+                        Schema.Field.nullable(
+                            "latitude", 
Schema.FieldType.array(Schema.FieldType.FLOAT)),
+                        Schema.Field.nullable(
+                            "longitude", 
Schema.FieldType.array(Schema.FieldType.FLOAT))))),
+            Schema.Field.nullable(
+                "childrenAndAge",
+                Schema.FieldType.map(Schema.FieldType.STRING, 
Schema.FieldType.INT32))));
+  }
+}
diff --git 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java
 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java
index badd01eee29..add8844f48f 100644
--- 
a/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java
+++ 
b/sdks/java/io/debezium/src/test/java/org/apache/beam/io/debezium/SourceRecordJsonTest.java
@@ -33,21 +33,22 @@ import org.junit.runners.JUnit4;
 public class SourceRecordJsonTest implements Serializable {
   @Test
   public void testSourceRecordJson() {
-    SourceRecord record = this.buildSourceRecord();
+    SourceRecord record = buildSourceRecord();
     SourceRecordJson json = new SourceRecordJson(record);
 
     String jsonString = json.toJson();
 
     String expectedJson =
         "{\"metadata\":"
-            + "{\"connector\":\"test-connector\","
-            + "\"version\":\"version-connector\","
+            + 
"{\"connector\":\"test-connector\",\"version\":\"version-connector\","
             + "\"name\":\"test-connector-sql\","
-            + "\"database\":\"test-db\","
-            + "\"schema\":\"test-schema\","
-            + "\"table\":\"test-table\"},"
-            + "\"before\":{\"fields\":{\"column1\":\"before-name\"}},"
-            + "\"after\":{\"fields\":{\"column1\":\"after-name\"}}}";
+            + 
"\"database\":\"test-db\",\"schema\":\"test-schema\",\"table\":\"test-table\"},"
+            + 
"\"before\":{\"fields\":{\"country\":null,\"distance\":123.423,\"birthYear\":null,"
+            + "\"name\":\"before-name\","
+            + "\"temperature\":104.4,\"childrenAndAge\":null,\"age\":16}},"
+            + 
"\"after\":{\"fields\":{\"country\":null,\"distance\":123.423,\"birthYear\":null,"
+            + "\"name\":\"after-name\","
+            + "\"temperature\":104.4,\"childrenAndAge\":null,\"age\":16}}}";
 
     assertEquals(expectedJson, jsonString);
   }
@@ -57,7 +58,7 @@ public class SourceRecordJsonTest implements Serializable {
     assertThrows(IllegalArgumentException.class, () -> new 
SourceRecordJson(null));
   }
 
-  private Schema buildSourceSchema() {
+  private static Schema buildSourceSchema() {
     return SchemaBuilder.struct()
         .field("connector", Schema.STRING_SCHEMA)
         .field("version", Schema.STRING_SCHEMA)
@@ -68,18 +69,32 @@ public class SourceRecordJsonTest implements Serializable {
         .build();
   }
 
-  private Schema buildBeforeSchema() {
-    return SchemaBuilder.struct().field("column1", 
Schema.STRING_SCHEMA).build();
-  }
-
-  private Schema buildAfterSchema() {
-    return SchemaBuilder.struct().field("column1", 
Schema.STRING_SCHEMA).build();
+  public static Schema buildTableSchema() {
+    return SchemaBuilder.struct()
+        .field("name", Schema.STRING_SCHEMA)
+        .field("age", SchemaBuilder.int8().doc("age of the person").build())
+        .field("temperature", Schema.FLOAT32_SCHEMA)
+        .field("distance", Schema.FLOAT64_SCHEMA)
+        .field("birthYear", Schema.OPTIONAL_INT64_SCHEMA)
+        .field(
+            "country",
+            SchemaBuilder.struct()
+                .optional()
+                .field("name", Schema.STRING_SCHEMA)
+                .field("population", Schema.OPTIONAL_INT64_SCHEMA)
+                .field("latitude", 
SchemaBuilder.array(Schema.FLOAT32_SCHEMA).optional())
+                .field("longitude", 
SchemaBuilder.array(Schema.FLOAT32_SCHEMA).optional())
+                .build())
+        .field(
+            "childrenAndAge",
+            SchemaBuilder.map(Schema.STRING_SCHEMA, 
Schema.INT32_SCHEMA).optional())
+        .build();
   }
 
-  private SourceRecord buildSourceRecord() {
-    final Schema sourceSchema = this.buildSourceSchema();
-    final Schema beforeSchema = this.buildBeforeSchema();
-    final Schema afterSchema = this.buildAfterSchema();
+  static SourceRecord buildSourceRecord() {
+    final Schema sourceSchema = buildSourceSchema();
+    final Schema beforeSchema = buildTableSchema();
+    final Schema afterSchema = buildTableSchema();
 
     final Schema schema =
         SchemaBuilder.struct()
@@ -101,8 +116,16 @@ public class SourceRecordJsonTest implements Serializable {
     source.put("schema", "test-schema");
     source.put("table", "test-table");
 
-    before.put("column1", "before-name");
-    after.put("column1", "after-name");
+    before
+        .put("name", "before-name")
+        .put("age", (byte) 16)
+        .put("temperature", (float) 104.4)
+        .put("distance", 123.423);
+    after
+        .put("name", "after-name")
+        .put("age", (byte) 16)
+        .put("temperature", (float) 104.4)
+        .put("distance", 123.423);
 
     value.put("source", source);
     value.put("before", before);

Reply via email to