merlimat closed pull request #2538: [io] jdbc connector uses Schema.AUTO
URL: https://github.com/apache/incubator-pulsar/pull/2538
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git a/pulsar-functions/java-examples/pom.xml 
b/pulsar-functions/java-examples/pom.xml
index b077dea2f0..747acff2a1 100644
--- a/pulsar-functions/java-examples/pom.xml
+++ b/pulsar-functions/java-examples/pom.xml
@@ -41,7 +41,6 @@
       <artifactId>pulsar-functions-api</artifactId>
       <version>${project.version}</version>
     </dependency>
-
   </dependencies>
 
 </project>
diff --git 
a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java 
b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
index 425fb57ac1..91d043db5f 100644
--- 
a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
+++ 
b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java
@@ -54,8 +54,6 @@
     private JdbcUtils.TableId tableId;
     private PreparedStatement insertStatement;
 
-    // TODO: turn to getSchema from 
SinkContext.getTopicSchema.getSchema(inputTopic)
-    protected String schema;
     protected JdbcUtils.TableDefinition tableDefinition;
 
     // for flush
@@ -89,7 +87,6 @@ public void open(Map<String, Object> config, SinkContext 
sinkContext) throws Exc
         connection.setAutoCommit(false);
         log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, 
connection.getAutoCommit());
 
-        schema = jdbcSinkConfig.getSchema();
         tableName = jdbcSinkConfig.getTableName();
         tableId = JdbcUtils.getTableId(connection, tableName);
         tableDefinition = JdbcUtils.getTableDefinition(connection, tableId);
diff --git 
a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
 
b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
similarity index 61%
rename from 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
rename to 
pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
index ec2822010e..c18dc81b82 100644
--- 
a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java
+++ 
b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java
@@ -20,50 +20,27 @@
 package org.apache.pulsar.io.jdbc;
 
 import java.sql.PreparedStatement;
-import java.util.Map;
 import lombok.extern.slf4j.Slf4j;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericDatumReader;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.io.DatumReader;
-import org.apache.avro.io.DecoderFactory;
-import org.apache.avro.util.Utf8;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.functions.api.Record;
-import org.apache.pulsar.io.core.SinkContext;
 import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId;
 
 /**
- * A Simple Jdbc sink, which assume input Record as AvroSchema format
+ * A Simple Jdbc sink, which interprets input Record in generic record.
  */
 @Slf4j
-public class JdbcAvroSchemaSink extends JdbcAbstractSink<byte[]> {
-
-    private Schema avroSchema = null;
-    private DatumReader<GenericRecord> reader = null;
-
+public class JdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> {
 
     @Override
-    public void open(Map<String, Object> config, SinkContext sinkContext) 
throws Exception {
-        super.open(config, sinkContext);
-        // get reader, and read value out as GenericRecord
-        if (avroSchema == null || reader == null) {
-            avroSchema = Schema.parse(schema);
-            reader = new GenericDatumReader<>(avroSchema);
-        }
-        log.info("open JdbcAvroSchemaSink with schema: {}, and 
tableDefinition: {}", schema, tableDefinition.toString());
-    }
-
-
     public void bindValue(PreparedStatement statement,
-                          Record<byte[]> message) throws Exception {
+                          Record<GenericRecord> message) throws Exception {
 
-        byte[] value = message.getValue();
-        GenericRecord record = reader.read(null, 
DecoderFactory.get().binaryDecoder(value, null));
+        GenericRecord record = message.getValue();
 
         int index = 1;
         for (ColumnId columnId : tableDefinition.getColumns()) {
             String colName = columnId.getName();
-            Object obj = record.get(colName);
+            Object obj = record.getField(colName);
             setColumnValue(statement, index++, obj);
             log.info("set column value: {}", obj.toString());
         }
@@ -80,8 +57,8 @@ private static void setColumnValue(PreparedStatement 
statement, int index, Objec
             statement.setFloat(index, (Float) value);
         } else if (value instanceof Boolean) {
             statement.setBoolean(index, (Boolean) value);
-        } else if (value instanceof Utf8) {
-            statement.setString(index, ((Utf8)value).toString());
+        } else if (value instanceof String) {
+            statement.setString(index, (String )value);
         } else if (value instanceof Short) {
             statement.setShort(index, (Short) value);
         } else {
diff --git 
a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java 
b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
index 3419811e0a..6cc95d6bac 100644
--- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
+++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java
@@ -44,9 +44,6 @@
     private String jdbcUrl;
     private String tableName;
 
-    // schema for input topic
-    private String schema;
-
     // Optional
     private int timeoutMs = 500;
     private int batchSize = 200;
diff --git a/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml 
b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
index d9d06bde47..38d9e3c295 100644
--- a/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
+++ b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml
@@ -19,4 +19,4 @@
 
 name: jdbc
 description: Jdbc sink
-sinkClass: org.apache.pulsar.io.jdbc.JdbcAvroSchemaSink
+sinkClass: org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink
diff --git 
a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java 
b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
index 33bb859547..84f6a82f03 100644
--- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
+++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java
@@ -28,9 +28,11 @@
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
 import org.apache.pulsar.client.api.Message;
-import org.apache.pulsar.client.api.Schema;
+import org.apache.pulsar.client.api.schema.GenericRecord;
 import org.apache.pulsar.client.impl.MessageImpl;
+import org.apache.pulsar.client.impl.schema.AutoSchema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
+import org.apache.pulsar.client.impl.schema.generic.GenericSchema;
 import org.apache.pulsar.functions.api.Record;
 import org.apache.pulsar.functions.source.PulsarRecord;
 import org.testng.Assert;
@@ -69,7 +71,7 @@ public void tearDown() throws Exception {
 
     @Test
     public void TestOpenAndWriteSink() throws Exception {
-        JdbcAvroSchemaSink jdbcSink;
+        JdbcAutoSchemaSink jdbcSink;
         Map<String, Object> conf;
         String tableName = "TestOpenAndWriteSink";
 
@@ -78,7 +80,7 @@ public void TestOpenAndWriteSink() throws Exception {
         conf.put("jdbcUrl", jdbcUrl);
         conf.put("tableName", tableName);
 
-        jdbcSink = new JdbcAvroSchemaSink();
+        jdbcSink = new JdbcAutoSchemaSink();
 
         sqliteUtils.createTable(
             "CREATE TABLE " + tableName + "(" +
@@ -94,13 +96,14 @@ public void TestOpenAndWriteSink() throws Exception {
         obj.setField2("ValueOfField1");
         obj.setField3(3);
         AvroSchema<Foo> schema = AvroSchema.of(Foo.class);
-        conf.put("schema",  new String(schema.getSchemaInfo().getSchema()));
-        log.info("schema: {}", new String(schema.getSchemaInfo().getSchema()));
 
         byte[] bytes = schema.encode(obj);
         ByteBuf payload = Unpooled.copiedBuffer(bytes);
-        Message<byte[]> message = new MessageImpl("77:777", conf, payload, 
Schema.BYTES);
-        Record<byte[]> record = PulsarRecord.<byte[]>builder()
+        AutoSchema autoSchema = new AutoSchema();
+        autoSchema.setSchema(GenericSchema.of(schema.getSchemaInfo()));
+
+        Message<GenericRecord> message = new MessageImpl("77:777", conf, 
payload, autoSchema);
+        Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder()
             .message(message)
             .topicName("fake_topic_name")
             .build();
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
index 0bf9ded6b3..b1bfc8c073 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java
@@ -45,6 +45,7 @@
 import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator;
 import 
org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime;
 import org.apache.pulsar.tests.integration.io.*;
+import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo;
 import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType;
 import org.apache.pulsar.tests.integration.topologies.PulsarCluster;
 import org.testng.annotations.Test;
@@ -98,6 +99,11 @@ private void testSink(SinkTester tester, boolean builtin) 
throws Exception {
         // prepare the testing environment for sink
         prepareSink(tester);
 
+        ensureSubscriptionCreated(
+            inputTopicName,
+            String.format("public/default/%s", sinkName),
+            tester.getInputTopicSchema());
+
         // submit the sink connector
         submitSinkConnector(tester, tenant, namespace, sinkName, 
inputTopicName);
 
@@ -246,13 +252,14 @@ protected void getSinkStatus(String tenant, String 
namespace, String sinkName) t
 
     // This for JdbcSinkTester
     protected Map<String, String> produceSchemaMessagesToInputTopic(String 
inputTopicName,
-                                                              int numMessages, 
 Schema schema) throws Exception {
+                                                                    int 
numMessages,
+                                                                    
Schema<Foo> schema) throws Exception {
         @Cleanup
         PulsarClient client = PulsarClient.builder()
             .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
             .build();
         @Cleanup
-        Producer<String> producer = client.newProducer(Schema.STRING)
+        Producer<Foo> producer = client.newProducer(schema)
             .topic(inputTopicName)
             .create();
         LinkedHashMap<String, String> kvs = new LinkedHashMap<>();
@@ -268,7 +275,7 @@ protected void getSinkStatus(String tenant, String 
namespace, String sinkName) t
             kvs.put(key, value);
             producer.newMessage()
                 .key(key)
-                .value(value)
+                .value(obj)
                 .send();
         }
         return kvs;
@@ -630,7 +637,13 @@ private static void submitExclamationFunction(Runtime 
runtime,
             commands);
         assertTrue(result.getStdout().contains("\"Created successfully\""));
 
+        ensureSubscriptionCreated(inputTopicName, 
String.format("public/default/%s", functionName), inputTopicSchema);
+    }
 
+    private static <T> void ensureSubscriptionCreated(String inputTopicName,
+                                                      String subscriptionName,
+                                                      Schema<T> 
inputTopicSchema)
+            throws Exception {
         // ensure the function subscription exists before we start producing 
messages
         try (PulsarClient client = PulsarClient.builder()
             .serviceUrl(pulsarCluster.getPlainTextServiceUrl())
@@ -638,7 +651,7 @@ private static void submitExclamationFunction(Runtime 
runtime,
             try (Consumer<T> ignored = client.newConsumer(inputTopicSchema)
                 .topic(inputTopicName)
                 .subscriptionType(SubscriptionType.Shared)
-                .subscriptionName(String.format("public/default/%s", 
functionName))
+                .subscriptionName(subscriptionName)
                 .subscribe()) {
             }
         }
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
index e4aa401042..7c14ba96c0 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java
@@ -31,6 +31,7 @@
 import lombok.EqualsAndHashCode;
 import lombok.ToString;
 import lombok.extern.slf4j.Slf4j;
+import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.client.impl.schema.AvroSchema;
 import org.testcontainers.containers.GenericContainer;
 import org.testcontainers.containers.MySQLContainer;
@@ -69,13 +70,14 @@ public JdbcSinkTester() {
         sinkConfig.put("userName", "test");
         sinkConfig.put("password", "test");
         sinkConfig.put("tableName", tableName);
-
-        // prepare schema
-        sinkConfig.put("schema",  new 
String(schema.getSchemaInfo().getSchema()));
-        log.info("schema: {}", new String(schema.getSchemaInfo().getSchema()));
         sinkConfig.put("batchSize", 1);
     }
 
+    @Override
+    public Schema<?> getInputTopicSchema() {
+        return schema;
+    }
+
     @Override
     public void findSinkServiceContainer(Map<String, GenericContainer<?>> 
containers) {
         GenericContainer<?> container = containers.get("mysql");
diff --git 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
index 7f4b2d9a1b..8b61ff2118 100644
--- 
a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
+++ 
b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java
@@ -20,6 +20,7 @@
 
 import java.util.Map;
 import lombok.Getter;
+import org.apache.pulsar.client.api.Schema;
 import org.testcontainers.containers.GenericContainer;
 import org.testng.collections.Maps;
 
@@ -56,6 +57,10 @@ public SinkTester(String sinkArchive, String sinkClassName) {
         this.sinkConfig = Maps.newHashMap();
     }
 
+    public Schema<?> getInputTopicSchema() {
+        return Schema.STRING;
+    }
+
     public abstract void findSinkServiceContainer(Map<String, 
GenericContainer<?>> externalServices);
 
     public SinkType sinkType() {


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to