merlimat closed pull request #2538: [io] jdbc connector uses Schema.AUTO URL: https://github.com/apache/incubator-pulsar/pull/2538
This is a PR merged from a forked repository. As GitHub hides the original diff on merge, it is displayed below for the sake of provenance: As this is a foreign pull request (from a fork), the diff is supplied below (as it won't show otherwise due to GitHub magic): diff --git a/pulsar-functions/java-examples/pom.xml b/pulsar-functions/java-examples/pom.xml index b077dea2f0..747acff2a1 100644 --- a/pulsar-functions/java-examples/pom.xml +++ b/pulsar-functions/java-examples/pom.xml @@ -41,7 +41,6 @@ <artifactId>pulsar-functions-api</artifactId> <version>${project.version}</version> </dependency> - </dependencies> </project> diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java index 425fb57ac1..91d043db5f 100644 --- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java +++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -54,8 +54,6 @@ private JdbcUtils.TableId tableId; private PreparedStatement insertStatement; - // TODO: turn to getSchema from SinkContext.getTopicSchema.getSchema(inputTopic) - protected String schema; protected JdbcUtils.TableDefinition tableDefinition; // for flush @@ -89,7 +87,6 @@ public void open(Map<String, Object> config, SinkContext sinkContext) throws Exc connection.setAutoCommit(false); log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit()); - schema = jdbcSinkConfig.getSchema(); tableName = jdbcSinkConfig.getTableName(); tableId = JdbcUtils.getTableId(connection, tableName); tableDefinition = JdbcUtils.getTableDefinition(connection, tableId); diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java similarity index 61% rename from pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java rename to pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java index ec2822010e..c18dc81b82 100644 --- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java +++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAutoSchemaSink.java @@ -20,50 +20,27 @@ package org.apache.pulsar.io.jdbc; import java.sql.PreparedStatement; -import java.util.Map; import lombok.extern.slf4j.Slf4j; -import org.apache.avro.Schema; -import org.apache.avro.generic.GenericDatumReader; -import org.apache.avro.generic.GenericRecord; -import org.apache.avro.io.DatumReader; -import org.apache.avro.io.DecoderFactory; -import org.apache.avro.util.Utf8; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.functions.api.Record; -import org.apache.pulsar.io.core.SinkContext; import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId; /** - * A Simple Jdbc sink, which assume input Record as AvroSchema format + * A Simple Jdbc sink, which interprets input Record in generic record. */ @Slf4j -public class JdbcAvroSchemaSink extends JdbcAbstractSink<byte[]> { - - private Schema avroSchema = null; - private DatumReader<GenericRecord> reader = null; - +public class JdbcAutoSchemaSink extends JdbcAbstractSink<GenericRecord> { @Override - public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { - super.open(config, sinkContext); - // get reader, and read value out as GenericRecord - if (avroSchema == null || reader == null) { - avroSchema = Schema.parse(schema); - reader = new GenericDatumReader<>(avroSchema); - } - log.info("open JdbcAvroSchemaSink with schema: {}, and tableDefinition: {}", schema, tableDefinition.toString()); - } - - public void bindValue(PreparedStatement statement, - Record<byte[]> message) throws Exception { + Record<GenericRecord> message) throws Exception { - byte[] value = message.getValue(); - GenericRecord record = reader.read(null, DecoderFactory.get().binaryDecoder(value, null)); + GenericRecord record = message.getValue(); int index = 1; for (ColumnId columnId : tableDefinition.getColumns()) { String colName = columnId.getName(); - Object obj = record.get(colName); + Object obj = record.getField(colName); setColumnValue(statement, index++, obj); log.info("set column value: {}", obj.toString()); } @@ -80,8 +57,8 @@ private static void setColumnValue(PreparedStatement statement, int index, Objec statement.setFloat(index, (Float) value); } else if (value instanceof Boolean) { statement.setBoolean(index, (Boolean) value); - } else if (value instanceof Utf8) { - statement.setString(index, ((Utf8)value).toString()); + } else if (value instanceof String) { + statement.setString(index, (String )value); } else if (value instanceof Short) { statement.setShort(index, (Short) value); } else { diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java index 3419811e0a..6cc95d6bac 100644 --- a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java +++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java @@ -44,9 +44,6 @@ private String jdbcUrl; private String tableName; - // schema for input topic - private String schema; - // Optional private int timeoutMs = 500; private int batchSize = 200; diff --git a/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml index d9d06bde47..38d9e3c295 100644 --- a/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml +++ b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml @@ -19,4 +19,4 @@ name: jdbc description: Jdbc sink -sinkClass: org.apache.pulsar.io.jdbc.JdbcAvroSchemaSink +sinkClass: org.apache.pulsar.io.jdbc.JdbcAutoSchemaSink diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java index 33bb859547..84f6a82f03 100644 --- a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java +++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java @@ -28,9 +28,11 @@ import lombok.ToString; import lombok.extern.slf4j.Slf4j; import org.apache.pulsar.client.api.Message; -import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.api.schema.GenericRecord; import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.schema.AutoSchema; import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.client.impl.schema.generic.GenericSchema; import org.apache.pulsar.functions.api.Record; import org.apache.pulsar.functions.source.PulsarRecord; import org.testng.Assert; @@ -69,7 +71,7 @@ public void tearDown() throws Exception { @Test public void TestOpenAndWriteSink() throws Exception { - JdbcAvroSchemaSink jdbcSink; + JdbcAutoSchemaSink jdbcSink; Map<String, Object> conf; String tableName = "TestOpenAndWriteSink"; @@ -78,7 +80,7 @@ public void TestOpenAndWriteSink() throws Exception { conf.put("jdbcUrl", jdbcUrl); conf.put("tableName", tableName); - jdbcSink = new JdbcAvroSchemaSink(); + jdbcSink = new JdbcAutoSchemaSink(); sqliteUtils.createTable( "CREATE TABLE " + tableName + "(" + @@ -94,13 +96,14 @@ public void TestOpenAndWriteSink() throws Exception { obj.setField2("ValueOfField1"); obj.setField3(3); AvroSchema<Foo> schema = AvroSchema.of(Foo.class); - conf.put("schema", new String(schema.getSchemaInfo().getSchema())); - log.info("schema: {}", new String(schema.getSchemaInfo().getSchema())); byte[] bytes = schema.encode(obj); ByteBuf payload = Unpooled.copiedBuffer(bytes); - Message<byte[]> message = new MessageImpl("77:777", conf, payload, Schema.BYTES); - Record<byte[]> record = PulsarRecord.<byte[]>builder() + AutoSchema autoSchema = new AutoSchema(); + autoSchema.setSchema(GenericSchema.of(schema.getSchemaInfo())); + + Message<GenericRecord> message = new MessageImpl("77:777", conf, payload, autoSchema); + Record<GenericRecord> record = PulsarRecord.<GenericRecord>builder() .message(message) .topicName("fake_topic_name") .build(); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 0bf9ded6b3..b1bfc8c073 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -45,6 +45,7 @@ import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator; import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime; import org.apache.pulsar.tests.integration.io.*; +import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo; import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; import org.testng.annotations.Test; @@ -98,6 +99,11 @@ private void testSink(SinkTester tester, boolean builtin) throws Exception { // prepare the testing environment for sink prepareSink(tester); + ensureSubscriptionCreated( + inputTopicName, + String.format("public/default/%s", sinkName), + tester.getInputTopicSchema()); + // submit the sink connector submitSinkConnector(tester, tenant, namespace, sinkName, inputTopicName); @@ -246,13 +252,14 @@ protected void getSinkStatus(String tenant, String namespace, String sinkName) t // This for JdbcSinkTester protected Map<String, String> produceSchemaMessagesToInputTopic(String inputTopicName, - int numMessages, Schema schema) throws Exception { + int numMessages, + Schema<Foo> schema) throws Exception { @Cleanup PulsarClient client = PulsarClient.builder() .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) .build(); @Cleanup - Producer<String> producer = client.newProducer(Schema.STRING) + Producer<Foo> producer = client.newProducer(schema) .topic(inputTopicName) .create(); LinkedHashMap<String, String> kvs = new LinkedHashMap<>(); @@ -268,7 +275,7 @@ protected void getSinkStatus(String tenant, String namespace, String sinkName) t kvs.put(key, value); producer.newMessage() .key(key) - .value(value) + .value(obj) .send(); } return kvs; @@ -630,7 +637,13 @@ private static void submitExclamationFunction(Runtime runtime, commands); assertTrue(result.getStdout().contains("\"Created successfully\"")); + ensureSubscriptionCreated(inputTopicName, String.format("public/default/%s", functionName), inputTopicSchema); + } + private static <T> void ensureSubscriptionCreated(String inputTopicName, + String subscriptionName, + Schema<T> inputTopicSchema) + throws Exception { // ensure the function subscription exists before we start producing messages try (PulsarClient client = PulsarClient.builder() .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) @@ -638,7 +651,7 @@ private static void submitExclamationFunction(Runtime runtime, try (Consumer<T> ignored = client.newConsumer(inputTopicSchema) .topic(inputTopicName) .subscriptionType(SubscriptionType.Shared) - .subscriptionName(String.format("public/default/%s", functionName)) + .subscriptionName(subscriptionName) .subscribe()) { } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java index e4aa401042..7c14ba96c0 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java @@ -31,6 +31,7 @@ import lombok.EqualsAndHashCode; import lombok.ToString; import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.impl.schema.AvroSchema; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.MySQLContainer; @@ -69,13 +70,14 @@ public JdbcSinkTester() { sinkConfig.put("userName", "test"); sinkConfig.put("password", "test"); sinkConfig.put("tableName", tableName); - - // prepare schema - sinkConfig.put("schema", new String(schema.getSchemaInfo().getSchema())); - log.info("schema: {}", new String(schema.getSchemaInfo().getSchema())); sinkConfig.put("batchSize", 1); } + @Override + public Schema<?> getInputTopicSchema() { + return schema; + } + @Override public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) { GenericContainer<?> container = containers.get("mysql"); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java index 7f4b2d9a1b..8b61ff2118 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/SinkTester.java @@ -20,6 +20,7 @@ import java.util.Map; import lombok.Getter; +import org.apache.pulsar.client.api.Schema; import org.testcontainers.containers.GenericContainer; import org.testng.collections.Maps; @@ -56,6 +57,10 @@ public SinkTester(String sinkArchive, String sinkClassName) { this.sinkConfig = Maps.newHashMap(); } + public Schema<?> getInputTopicSchema() { + return Schema.STRING; + } + public abstract void findSinkServiceContainer(Map<String, GenericContainer<?>> externalServices); public SinkType sinkType() { ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services