This is an automated email from the ASF dual-hosted git repository. sijie pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/incubator-pulsar.git
The following commit(s) were added to refs/heads/master by this push: new 79ac46a Issue 2313: create a JDBC sink connector (#2440) 79ac46a is described below commit 79ac46a6ae50e48bed4ccb680d1f7945611f0565 Author: Jia Zhai <jiaz...@users.noreply.github.com> AuthorDate: Wed Sep 5 04:20:42 2018 +0800 Issue 2313: create a JDBC sink connector (#2440) ### Motivation This change is trying to add a basic JDBC sink connector. ### Modifications Add the jdbc module to the pulsar-io sub-module. Add unit test and integration test for it. ### Result ut and integration test pass. Master Issue: #2442 --- distribution/io/src/assemble/io.xml | 5 + pom.xml | 9 +- pulsar-io/jdbc/lombok.config | 23 +++ pulsar-io/jdbc/pom.xml | 96 ++++++++++ .../apache/pulsar/io/jdbc/JdbcAbstractSink.java | 197 +++++++++++++++++++++ .../apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java | 92 ++++++++++ .../org/apache/pulsar/io/jdbc/JdbcSinkConfig.java | 63 +++++++ .../java/org/apache/pulsar/io/jdbc/JdbcUtils.java | 178 +++++++++++++++++++ .../resources/META-INF/services/pulsar-io.yaml | 22 +++ .../org/apache/pulsar/io/jdbc/JdbcSinkTest.java | 135 ++++++++++++++ .../org/apache/pulsar/io/jdbc/JdbcUtilsTest.java | 95 ++++++++++ .../org/apache/pulsar/io/jdbc/SqliteUtils.java | 111 ++++++++++++ pulsar-io/pom.xml | 1 + tests/integration/pom.xml | 21 +++ .../integration/functions/PulsarFunctionsTest.java | 54 +++++- .../functions/PulsarFunctionsTestBase.java | 2 +- .../tests/integration/io/JdbcSinkTester.java | 137 ++++++++++++++ .../tests/integration/suites/PulsarTestSuite.java | 11 ++ 18 files changed, 1245 insertions(+), 7 deletions(-) diff --git a/distribution/io/src/assemble/io.xml b/distribution/io/src/assemble/io.xml index 8cf7fce..bb75e84 100644 --- a/distribution/io/src/assemble/io.xml +++ b/distribution/io/src/assemble/io.xml @@ -75,6 +75,11 @@ <fileMode>644</fileMode> </file> <file> + <source>${basedir}/../../pulsar-io/jdbc/target/pulsar-io-jdbc-${project.version}.nar</source> + <outputDirectory>connectors</outputDirectory> + <fileMode>644</fileMode> + </file> + <file> <source>${basedir}/../../pulsar-io/data-genenator/target/pulsar-io-data-generator-${project.version}.nar</source> <outputDirectory>connectors</outputDirectory> <fileMode>644</fileMode> diff --git a/pom.xml b/pom.xml index 1ed2802..681f18e 100644 --- a/pom.xml +++ b/pom.xml @@ -167,6 +167,8 @@ flexible messaging model and an intuitive client API.</description> <aws-sdk.version>1.11.297</aws-sdk.version> <avro.version>1.8.2</avro.version> <jclouds.version>2.1.1</jclouds.version> + <sqlite-jdbc.version>3.8.11.2</sqlite-jdbc.version> + <mysql-jdbc.version>8.0.11</mysql-jdbc.version> <presto.version>0.206</presto.version> <flink.version>1.6.0</flink.version> <scala.binary.version>2.11</scala.binary.version> @@ -822,6 +824,11 @@ flexible messaging model and an intuitive client API.</description> <version>${testcontainers.version}</version> </dependency> <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>mysql</artifactId> + <version>${testcontainers.version}</version> + </dependency> + <dependency> <groupId>org.arquillian.cube</groupId> <artifactId>arquillian-cube-docker</artifactId> <version>${arquillian-cube.version}</version> @@ -1086,7 +1093,7 @@ flexible messaging model and an intuitive client API.</description> <exclude>src/main/java/org/apache/pulsar/common/api/proto/PulsarApi.java</exclude> <exclude>src/main/java/org/apache/pulsar/broker/service/schema/proto/SchemaRegistryFormat.java</exclude> <exclude>bin/proto/MLDataFormats_pb2.py</exclude> - + <!-- pulasr-io-connector kinesis : auto generated files from flatbuffer schema --> <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/CompressionType.java</exclude> <exclude>src/main/java/org/apache/pulsar/io/kinesis/fbs/EncryptionCtx.java</exclude> diff --git a/pulsar-io/jdbc/lombok.config b/pulsar-io/jdbc/lombok.config new file mode 100644 index 0000000..9a9adee --- /dev/null +++ b/pulsar-io/jdbc/lombok.config @@ -0,0 +1,23 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +## This file is to fix the conflict with jackson error like this: +## com.fasterxml.jackson.databind.exc.MismatchedInputException: Cannot construct instance of ... +lombok.anyConstructor.addConstructorProperties=true +config.stopBubbling = true diff --git a/pulsar-io/jdbc/pom.xml b/pulsar-io/jdbc/pom.xml new file mode 100644 index 0000000..eed8588 --- /dev/null +++ b/pulsar-io/jdbc/pom.xml @@ -0,0 +1,96 @@ +<!-- + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, + software distributed under the License is distributed on an + "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + KIND, either express or implied. See the License for the + specific language governing permissions and limitations + under the License. + +--> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <modelVersion>4.0.0</modelVersion> + <parent> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-io</artifactId> + <version>2.2.0-incubating-SNAPSHOT</version> + </parent> + + <artifactId>pulsar-io-jdbc</artifactId> + <name>Pulsar IO :: Jdbc</name> + + <dependencies> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-io-core</artifactId> + <version>${project.version}</version> + </dependency> + + + <dependency> + <groupId>org.apache.avro</groupId> + <artifactId>avro</artifactId> + <version>${avro.version}</version> + </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-functions-instance</artifactId> + <version>${project.version}</version> + </dependency> + + <dependency> + <groupId>org.xerial</groupId> + <artifactId>sqlite-jdbc</artifactId> + <version>${sqlite-jdbc.version}</version> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>${mysql-jdbc.version}</version> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.core</groupId> + <artifactId>jackson-databind</artifactId> + </dependency> + + <dependency> + <groupId>com.fasterxml.jackson.dataformat</groupId> + <artifactId>jackson-dataformat-yaml</artifactId> + </dependency> + + <dependency> + <groupId>${project.groupId}</groupId> + <artifactId>pulsar-client-original</artifactId> + <version>${project.version}</version> + </dependency> + + </dependencies> + + <build> + <plugins> + <plugin> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-nar-maven-plugin</artifactId> + </plugin> + </plugins> + </build> + + +</project> diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java new file mode 100644 index 0000000..425fb57 --- /dev/null +++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAbstractSink.java @@ -0,0 +1,197 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import static jersey.repackaged.com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.PreparedStatement; +import java.sql.Statement; +import java.util.List; +import java.util.Map; +import java.util.Properties; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.Sink; +import org.apache.pulsar.io.core.SinkContext; + +/** + * A Simple abstract class for Jdbc sink + * Users need to implement extractKeyValue function to use this sink + */ +@Slf4j +public abstract class JdbcAbstractSink<T> implements Sink<T> { + // ----- Runtime fields + private JdbcSinkConfig jdbcSinkConfig; + @Getter + private Connection connection; + private String jdbcUrl; + private String tableName; + + private JdbcUtils.TableId tableId; + private PreparedStatement insertStatement; + + // TODO: turn to getSchema from SinkContext.getTopicSchema.getSchema(inputTopic) + protected String schema; + protected JdbcUtils.TableDefinition tableDefinition; + + // for flush + private List<Record<T>> incomingList; + private List<Record<T>> swapList; + private AtomicBoolean isFlushing; + private int timeoutMs; + private int batchSize; + private ScheduledExecutorService flushExecutor; + + @Override + public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { + jdbcSinkConfig = JdbcSinkConfig.load(config); + + jdbcUrl = jdbcSinkConfig.getJdbcUrl(); + if (jdbcSinkConfig.getJdbcUrl() == null) { + throw new IllegalArgumentException("Required jdbc Url not set."); + } + + Properties properties = new Properties(); + String username = jdbcSinkConfig.getUserName(); + String password = jdbcSinkConfig.getPassword(); + if (username != null) { + properties.setProperty("user", username); + } + if (password != null) { + properties.setProperty("password", password); + } + + connection = JdbcUtils.getConnection(jdbcUrl, properties); + connection.setAutoCommit(false); + log.info("Opened jdbc connection: {}, autoCommit: {}", jdbcUrl, connection.getAutoCommit()); + + schema = jdbcSinkConfig.getSchema(); + tableName = jdbcSinkConfig.getTableName(); + tableId = JdbcUtils.getTableId(connection, tableName); + tableDefinition = JdbcUtils.getTableDefinition(connection, tableId); + insertStatement = JdbcUtils.buildInsertStatement(connection, JdbcUtils.buildInsertSql(tableDefinition)); + + timeoutMs = jdbcSinkConfig.getTimeoutMs(); + batchSize = jdbcSinkConfig.getBatchSize(); + incomingList = Lists.newArrayList(); + swapList = Lists.newArrayList(); + isFlushing = new AtomicBoolean(false); + + flushExecutor = Executors.newScheduledThreadPool(1); + flushExecutor.scheduleAtFixedRate(() -> flush(), timeoutMs, timeoutMs, TimeUnit.MILLISECONDS); + } + + @Override + public void close() throws Exception { + if (!connection.getAutoCommit()) { + connection.commit(); + } + flushExecutor.shutdown(); + if (connection != null) { + connection.close(); + } + log.info("Closed jdbc connection: {}", jdbcUrl); + } + + @Override + public void write(Record<T> record) throws Exception { + int number; + synchronized (incomingList) { + incomingList.add(record); + number = incomingList.size(); + } + + if (number == batchSize) { + flushExecutor.schedule(() -> flush(), 0, TimeUnit.MILLISECONDS); + } + } + + // bind value with a PreparedStetement + public abstract void bindValue( + PreparedStatement statement, + Record<T> message) throws Exception; + + + private void flush() { + // if not in flushing state, do flush, else return; + if (incomingList.size() > 0 && isFlushing.compareAndSet(false, true)) { + if (log.isDebugEnabled()) { + log.debug("Starting flush, queue size: {}", incomingList.size()); + } + checkState(swapList.isEmpty(), + "swapList should be empty since last flush. swapList.size: " + swapList.size()); + + synchronized (incomingList) { + List<Record<T>> tmpList; + swapList.clear(); + + tmpList = swapList; + swapList = incomingList; + incomingList = tmpList; + } + + int updateCount = 0; + boolean noInfo = false; + try { + // bind each record value + for (Record<T> record : swapList) { + bindValue(insertStatement, record); + insertStatement.addBatch(); + record.ack(); + } + + for (int updates : insertStatement.executeBatch()) { + if (updates == Statement.SUCCESS_NO_INFO) { + noInfo = true; + continue; + } + updateCount += updateCount; + } + connection.commit(); + swapList.forEach(tRecord -> tRecord.ack()); + } catch (Exception e) { + log.error("Got exception ", e); + swapList.forEach(tRecord -> tRecord.fail()); + } + + if (swapList.size() != updateCount) { + log.error("Update count {} not match total number of records {}", updateCount, swapList.size()); + } + + // finish flush + if (log.isDebugEnabled()) { + log.debug("Finish flush, queue size: {}", swapList.size()); + } + isFlushing.set(false); + } else { + if (log.isDebugEnabled()) { + log.debug("Already in flushing state, will not flush, queue size: {}", incomingList.size()); + } + } + } + +} diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java new file mode 100644 index 0000000..ec28220 --- /dev/null +++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcAvroSchemaSink.java @@ -0,0 +1,92 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import java.sql.PreparedStatement; +import java.util.Map; +import lombok.extern.slf4j.Slf4j; +import org.apache.avro.Schema; +import org.apache.avro.generic.GenericDatumReader; +import org.apache.avro.generic.GenericRecord; +import org.apache.avro.io.DatumReader; +import org.apache.avro.io.DecoderFactory; +import org.apache.avro.util.Utf8; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.io.core.SinkContext; +import org.apache.pulsar.io.jdbc.JdbcUtils.ColumnId; + +/** + * A Simple Jdbc sink, which assume input Record as AvroSchema format + */ +@Slf4j +public class JdbcAvroSchemaSink extends JdbcAbstractSink<byte[]> { + + private Schema avroSchema = null; + private DatumReader<GenericRecord> reader = null; + + + @Override + public void open(Map<String, Object> config, SinkContext sinkContext) throws Exception { + super.open(config, sinkContext); + // get reader, and read value out as GenericRecord + if (avroSchema == null || reader == null) { + avroSchema = Schema.parse(schema); + reader = new GenericDatumReader<>(avroSchema); + } + log.info("open JdbcAvroSchemaSink with schema: {}, and tableDefinition: {}", schema, tableDefinition.toString()); + } + + + public void bindValue(PreparedStatement statement, + Record<byte[]> message) throws Exception { + + byte[] value = message.getValue(); + GenericRecord record = reader.read(null, DecoderFactory.get().binaryDecoder(value, null)); + + int index = 1; + for (ColumnId columnId : tableDefinition.getColumns()) { + String colName = columnId.getName(); + Object obj = record.get(colName); + setColumnValue(statement, index++, obj); + log.info("set column value: {}", obj.toString()); + } + } + + private static void setColumnValue(PreparedStatement statement, int index, Object value) throws Exception { + if (value instanceof Integer) { + statement.setInt(index, (Integer) value); + } else if (value instanceof Long) { + statement.setLong(index, (Long) value); + } else if (value instanceof Double) { + statement.setDouble(index, (Double) value); + } else if (value instanceof Float) { + statement.setFloat(index, (Float) value); + } else if (value instanceof Boolean) { + statement.setBoolean(index, (Boolean) value); + } else if (value instanceof Utf8) { + statement.setString(index, ((Utf8)value).toString()); + } else if (value instanceof Short) { + statement.setShort(index, (Short) value); + } else { + throw new Exception("Not support value type, need to add it. " + value.getClass()); + } + } +} + diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java new file mode 100644 index 0000000..3419811 --- /dev/null +++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcSinkConfig.java @@ -0,0 +1,63 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.dataformat.yaml.YAMLFactory; +import lombok.*; +import lombok.experimental.Accessors; + +import java.io.File; +import java.io.IOException; +import java.io.Serializable; +import java.util.Map; + +@Data +@Setter +@Getter +@EqualsAndHashCode +@ToString +@Accessors(chain = true) +public class JdbcSinkConfig implements Serializable { + + private static final long serialVersionUID = 1L; + + private String userName; + private String password; + private String jdbcUrl; + private String tableName; + + // schema for input topic + private String schema; + + // Optional + private int timeoutMs = 500; + private int batchSize = 200; + + public static JdbcSinkConfig load(String yamlFile) throws IOException { + ObjectMapper mapper = new ObjectMapper(new YAMLFactory()); + return mapper.readValue(new File(yamlFile), JdbcSinkConfig.class); + } + + public static JdbcSinkConfig load(Map<String, Object> map) throws IOException { + ObjectMapper mapper = new ObjectMapper(); + return mapper.readValue(new ObjectMapper().writeValueAsString(map), JdbcSinkConfig.class); + } +} diff --git a/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java new file mode 100644 index 0000000..e959909 --- /dev/null +++ b/pulsar-io/jdbc/src/main/java/org/apache/pulsar/io/jdbc/JdbcUtils.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import static com.google.common.base.Preconditions.checkState; + +import com.google.common.collect.Lists; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.util.List; +import java.util.Properties; +import java.util.stream.IntStream; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.Getter; +import lombok.Setter; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; + +/** + * Jdbc Utils + */ +@Slf4j +public class JdbcUtils { + + @Data(staticConstructor = "of") + @Setter + @Getter + @EqualsAndHashCode + @ToString + public static class TableId { + private final String catalogName; + private final String schemaName; + private final String tableName; + } + + @Data(staticConstructor = "of") + @Setter + @Getter + @EqualsAndHashCode + @ToString + public static class ColumnId { + private final TableId tableId; + private final String name; + // SQL type from java.sql.Types + private final int type; + private final String typeName; + // column position in table + private final int position; + } + + @Data(staticConstructor = "of") + @Setter + @Getter + @EqualsAndHashCode + @ToString + public static class TableDefinition { + private final TableId tableId; + private final List<ColumnId> columns; + } + + /** + * Given a driver type(such as mysql), return its jdbc driver class name. + * TODO: test and support more types, also add Driver in pom file. + */ + public static String getDriverClassName(String driver) throws Exception { + if (driver.equals("mysql")) { + return "com.mysql.jdbc.Driver"; + } if (driver.equals("sqlite")) { + return "org.sqlite.JDBC"; + } else { + throw new Exception("Not tested jdbc driver type: " + driver); + } + } + + /** + * Get the {@link Connection} for the given jdbcUrl. + */ + public static Connection getConnection(String jdbcUrl, Properties properties) throws Exception { + String driver = jdbcUrl.split(":")[1]; + String driverClassName = getDriverClassName(driver); + Class.forName(driverClassName); + + return DriverManager.getConnection(jdbcUrl, properties); + } + + /** + * Get the {@link TableId} for the given tableName. + */ + public static TableId getTableId(Connection connection, String tableName) throws Exception { + DatabaseMetaData metadata = connection.getMetaData(); + try (ResultSet rs = metadata.getTables(null, null, tableName, new String[]{"TABLE"})) { + if (rs.next()) { + String catalogName = rs.getString(1); + String schemaName = rs.getString(2); + String gotTableName = rs.getString(3); + checkState(tableName.equals(gotTableName), + "TableName not match: " + tableName + " Got: " + gotTableName); + if (log.isDebugEnabled()) { + log.debug("Get Table: {}, {}, {}", catalogName, schemaName, tableName); + } + return TableId.of(catalogName, schemaName, tableName); + } else { + throw new Exception("Not able to find table: " + tableName); + } + } + } + + /** + * Get the {@link TableDefinition} for the given table. + */ + public static TableDefinition getTableDefinition(Connection connection, TableId tableId) throws Exception { + TableDefinition table = TableDefinition.of(tableId, Lists.newArrayList()); + + try (ResultSet rs = connection.getMetaData().getColumns( + tableId.getCatalogName(), + tableId.getSchemaName(), + tableId.getTableName(), + null + )) { + while (rs.next()) { + final String columnName = rs.getString(4); + + final int sqlDataType = rs.getInt(5); + final String typeName = rs.getString(6); + final int position = rs.getInt(17); + + table.columns.add(ColumnId.of(tableId, columnName, sqlDataType, typeName, position)); + if (log.isDebugEnabled()) { + log.debug("Get column. name: {}, data type: {}, position: {}", columnName, typeName, position); + } + } + return table; + } + } + + public static String buildInsertSql(TableDefinition table) { + StringBuilder builder = new StringBuilder(); + builder.append("INSERT INTO "); + builder.append(table.tableId.getTableName()); + builder.append("("); + + table.columns.forEach(columnId -> builder.append(columnId.getName()).append(",")); + builder.deleteCharAt(builder.length() - 1); + + builder.append(") VALUES("); + IntStream.range(0, table.columns.size() - 1).forEach(i -> builder.append("?,")); + builder.append("?)"); + + return builder.toString(); + } + + public static PreparedStatement buildInsertStatement(Connection connection, String insertSQL) throws SQLException { + return connection.prepareStatement(insertSQL); + } + +} diff --git a/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml new file mode 100644 index 0000000..d9d06bd --- /dev/null +++ b/pulsar-io/jdbc/src/main/resources/META-INF/services/pulsar-io.yaml @@ -0,0 +1,22 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +name: jdbc +description: Jdbc sink +sinkClass: org.apache.pulsar.io.jdbc.JdbcAvroSchemaSink diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java new file mode 100644 index 0000000..33bb859 --- /dev/null +++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcSinkTest.java @@ -0,0 +1,135 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import com.google.common.collect.Maps; +import io.netty.buffer.ByteBuf; +import io.netty.buffer.Unpooled; +import java.util.Map; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.api.Message; +import org.apache.pulsar.client.api.Schema; +import org.apache.pulsar.client.impl.MessageImpl; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.apache.pulsar.functions.api.Record; +import org.apache.pulsar.functions.source.PulsarRecord; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Jdbc Sink test + */ +@Slf4j +public class JdbcSinkTest { + private final SqliteUtils sqliteUtils = new SqliteUtils(getClass().getSimpleName()); + + /** + * A Simple class to test jdbc class + */ + @Data + @ToString + @EqualsAndHashCode + public static class Foo { + private String field1; + private String field2; + private int field3; + } + + @BeforeMethod + public void setUp() throws Exception { + sqliteUtils.setUp(); + } + + @AfterMethod + public void tearDown() throws Exception { + sqliteUtils.tearDown(); + } + + @Test + public void TestOpenAndWriteSink() throws Exception { + JdbcAvroSchemaSink jdbcSink; + Map<String, Object> conf; + String tableName = "TestOpenAndWriteSink"; + + String jdbcUrl = sqliteUtils.sqliteUri(); + conf = Maps.newHashMap(); + conf.put("jdbcUrl", jdbcUrl); + conf.put("tableName", tableName); + + jdbcSink = new JdbcAvroSchemaSink(); + + sqliteUtils.createTable( + "CREATE TABLE " + tableName + "(" + + " field1 TEXT," + + " field2 TEXT," + + " field3 INTEGER," + + "PRIMARY KEY (field1));" + ); + + // prepare a foo Record + Foo obj = new Foo(); + obj.setField1("ValueOfField1"); + obj.setField2("ValueOfField1"); + obj.setField3(3); + AvroSchema<Foo> schema = AvroSchema.of(Foo.class); + conf.put("schema", new String(schema.getSchemaInfo().getSchema())); + log.info("schema: {}", new String(schema.getSchemaInfo().getSchema())); + + byte[] bytes = schema.encode(obj); + ByteBuf payload = Unpooled.copiedBuffer(bytes); + Message<byte[]> message = new MessageImpl("77:777", conf, payload, Schema.BYTES); + Record<byte[]> record = PulsarRecord.<byte[]>builder() + .message(message) + .topicName("fake_topic_name") + .build(); + + log.info("foo:{}, Message.getValue: {}, record.getValue: {}", + obj.toString(), + message.getValue().toString(), + record.getValue().toString()); + + // change batchSize to 1, to flush on each write. + conf.put("batchSize", 1); + // open should success + jdbcSink.open(conf, null); + + // write should success. + jdbcSink.write(record); + log.info("executed write"); + // sleep to wait backend flush complete + Thread.sleep(500); + + // value has been written to db, read it out and verify. + String querySql = "SELECT * FROM " + tableName; + sqliteUtils.select(querySql, (resultSet) -> { + Assert.assertEquals(obj.getField1(), resultSet.getString(1)); + Assert.assertEquals(obj.getField2(), resultSet.getString(2)); + Assert.assertEquals(obj.getField3(), resultSet.getInt(3)); + }); + + jdbcSink.close(); + } + +} diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java new file mode 100644 index 0000000..d58802d --- /dev/null +++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/JdbcUtilsTest.java @@ -0,0 +1,95 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.SQLException; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.io.jdbc.JdbcUtils.TableDefinition; +import org.apache.pulsar.io.jdbc.JdbcUtils.TableId; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; + +/** + * Jdbc Utils test + */ +@Slf4j +public class JdbcUtilsTest { + + private final SqliteUtils sqliteUtils = new SqliteUtils(getClass().getSimpleName()); + @BeforeMethod + public void setUp() throws IOException, SQLException { + sqliteUtils.setUp(); + } + + @AfterMethod + public void tearDown() throws IOException, SQLException { + sqliteUtils.tearDown(); + } + + @Test + public void TestGetTableId() throws Exception { + String tableName = "TestGetTableId"; + + sqliteUtils.createTable( + "CREATE TABLE " + tableName + "(" + + " firstName TEXT," + + " lastName TEXT," + + " age INTEGER," + + " bool NUMERIC," + + " byte INTEGER," + + " short INTEGER NULL," + + " long INTEGER," + + " float NUMERIC," + + " double NUMERIC," + + " bytes BLOB, " + + "PRIMARY KEY (firstName, lastName));" + ); + + Connection connection = sqliteUtils.getConnection(); + + // Test getTableId + log.info("verify getTableId"); + TableId id = JdbcUtils.getTableId(connection, tableName); + Assert.assertEquals(id.getTableName(), tableName); + + // Test get getTableDefinition + log.info("verify getTableDefinition"); + TableDefinition table = JdbcUtils.getTableDefinition(connection, id); + Assert.assertEquals(table.getColumns().get(0).getName(), "firstName"); + Assert.assertEquals(table.getColumns().get(0).getTypeName(), "TEXT"); + Assert.assertEquals(table.getColumns().get(2).getName(), "age"); + Assert.assertEquals(table.getColumns().get(2).getTypeName(), "INTEGER"); + Assert.assertEquals(table.getColumns().get(7).getName(), "float"); + Assert.assertEquals(table.getColumns().get(7).getTypeName(), "NUMERIC"); + + // Test get getTableDefinition + log.info("verify buildInsertSql"); + String expctedStatement = "INSERT INTO " + tableName + + "(firstName,lastName,age,bool,byte,short,long,float,double,bytes)" + + " VALUES(?,?,?,?,?,?,?,?,?,?)"; + String statement = JdbcUtils.buildInsertSql(table); + Assert.assertEquals(statement, expctedStatement); + } + +} diff --git a/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java new file mode 100644 index 0000000..3b4a01a --- /dev/null +++ b/pulsar-io/jdbc/src/test/java/org/apache/pulsar/io/jdbc/SqliteUtils.java @@ -0,0 +1,111 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package org.apache.pulsar.io.jdbc; + + +import java.io.IOException; +import java.nio.file.Files; +import java.nio.file.Path; +import java.nio.file.Paths; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import lombok.extern.slf4j.Slf4j; + +@Slf4j +public final class SqliteUtils { + + static { + try { + Class.forName("org.sqlite.JDBC"); + } catch (ClassNotFoundException e) { + throw new RuntimeException(e); + } + } + + public interface ResultSetReadCallback { + void read(final ResultSet rs) throws SQLException; + } + + private final Path dbPath; + + private Connection connection; + + public Connection getConnection() { + return connection; + } + + public SqliteUtils(String testId) { + dbPath = Paths.get(testId + ".db"); + } + + public String sqliteUri() { + return "jdbc:sqlite:" + dbPath; + } + + public void setUp() throws SQLException, IOException { + Files.deleteIfExists(dbPath); + connection = DriverManager.getConnection(sqliteUri()); + connection.setAutoCommit(false); + } + + public void tearDown() throws SQLException, IOException { + connection.close(); + Files.deleteIfExists(dbPath); + } + + public void createTable(final String createSql) throws SQLException { + execute(createSql); + } + + public void deleteTable(final String table) throws SQLException { + execute("DROP TABLE IF EXISTS " + table); + + //random errors of table not being available happens in the unit tests + try { + Thread.sleep(100); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + public int select(final String query, final SqliteUtils.ResultSetReadCallback callback) throws SQLException { + int count = 0; + try (Statement stmt = connection.createStatement()) { + try (ResultSet rs = stmt.executeQuery(query)) { + while (rs.next()) { + callback.read(rs); + count++; + } + } + } + return count; + } + + public void execute(String sql) throws SQLException { + try (Statement stmt = connection.createStatement()) { + stmt.executeUpdate(sql); + connection.commit(); + } + } + +} diff --git a/pulsar-io/pom.xml b/pulsar-io/pom.xml index f1494db..e89cc02 100644 --- a/pulsar-io/pom.xml +++ b/pulsar-io/pom.xml @@ -38,6 +38,7 @@ <module>kafka</module> <module>rabbitmq</module> <module>kinesis</module> + <module>jdbc</module> <module>data-genenator</module> </modules> diff --git a/tests/integration/pom.xml b/tests/integration/pom.xml index a3a4694..6d3fdc4 100644 --- a/tests/integration/pom.xml +++ b/tests/integration/pom.xml @@ -78,6 +78,27 @@ <version>${project.version}</version> <scope>test</scope> </dependency> + + <dependency> + <groupId>org.testcontainers</groupId> + <artifactId>mysql</artifactId> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>mysql</groupId> + <artifactId>mysql-connector-java</artifactId> + <version>${mysql-jdbc.version}</version> + <scope>runtime</scope> + </dependency> + + <dependency> + <groupId>org.apache.pulsar</groupId> + <artifactId>pulsar-io-jdbc</artifactId> + <version>${project.version}</version> + <scope>test</scope> + </dependency> + <dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java index 67136a6..5cefc6a 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTest.java @@ -36,18 +36,22 @@ import org.apache.pulsar.client.api.Producer; import org.apache.pulsar.client.api.PulsarClient; import org.apache.pulsar.client.api.Schema; import org.apache.pulsar.client.api.SubscriptionType; +import org.apache.pulsar.client.impl.schema.AvroSchema; import org.apache.pulsar.common.naming.TopicName; import org.apache.pulsar.tests.integration.docker.ContainerExecException; import org.apache.pulsar.tests.integration.docker.ContainerExecResult; import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator; import org.apache.pulsar.tests.integration.functions.utils.CommandGenerator.Runtime; import org.apache.pulsar.tests.integration.io.CassandraSinkTester; +import org.apache.pulsar.tests.integration.io.JdbcSinkTester; +import org.apache.pulsar.tests.integration.io.JdbcSinkTester.Foo; import org.apache.pulsar.tests.integration.io.KafkaSinkTester; import org.apache.pulsar.tests.integration.io.KafkaSourceTester; import org.apache.pulsar.tests.integration.io.SinkTester; import org.apache.pulsar.tests.integration.io.SourceTester; import org.apache.pulsar.tests.integration.topologies.FunctionRuntimeType; import org.apache.pulsar.tests.integration.topologies.PulsarCluster; +import org.testng.Assert; import org.testng.annotations.Test; /** @@ -70,15 +74,20 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { testSink(new CassandraSinkTester()); } + @Test + public void testJdbcSink() throws Exception { + testSink(new JdbcSinkTester()); + } + private void testSink(SinkTester tester) throws Exception { tester.findSinkServiceContainer(pulsarCluster.getExternalServices()); final String tenant = TopicName.PUBLIC_TENANT; final String namespace = TopicName.DEFAULT_NAMESPACE; final String inputTopicName = "test-sink-connector-" - + functionRuntimeType + "-input-topic-" + randomName(8); + + tester.getSinkType() + "-" + functionRuntimeType + "-input-topic-" + randomName(8); final String sinkName = "test-sink-connector-" - + functionRuntimeType + "-name-" + randomName(8); + + tester.getSinkType() + "-" + functionRuntimeType + "-name-" + randomName(8); final int numMessages = 20; // prepare the testing environment for sink @@ -94,7 +103,12 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { getSinkStatus(tenant, namespace, sinkName); // produce messages - Map<String, String> kvs = produceMessagesToInputTopic(inputTopicName, numMessages); + Map<String, String> kvs; + if (tester instanceof JdbcSinkTester) { + kvs = produceSchemaMessagesToInputTopic(inputTopicName, numMessages, AvroSchema.of(Foo.class)); + } else { + kvs = produceMessagesToInputTopic(inputTopicName, numMessages); + } // wait for sink to process messages waitForProcessingMessages(tenant, namespace, sinkName, numMessages); @@ -202,6 +216,36 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { return kvs; } + // This for JdbcSinkTester + protected Map<String, String> produceSchemaMessagesToInputTopic(String inputTopicName, + int numMessages, Schema schema) throws Exception { + @Cleanup + PulsarClient client = PulsarClient.builder() + .serviceUrl(pulsarCluster.getPlainTextServiceUrl()) + .build(); + @Cleanup + Producer<String> producer = client.newProducer(Schema.STRING) + .topic(inputTopicName) + .create(); + LinkedHashMap<String, String> kvs = new LinkedHashMap<>(); + for (int i = 0; i < numMessages; i++) { + String key = "key-" + i; + + Foo obj = new Foo(); + obj.setField1("field1_" + i); + obj.setField2("field2_" + i); + obj.setField3(i); + String value = new String(schema.encode(obj)); + + kvs.put(key, value); + producer.newMessage() + .key(key) + .value(value) + .send(); + } + return kvs; + } + protected void waitForProcessingMessages(String tenant, String namespace, String sinkName, @@ -226,8 +270,8 @@ public abstract class PulsarFunctionsTest extends PulsarFunctionsTestBase { // expected in early iterations } - log.info("{} ms has elapsed but the sink hasn't process {} messages, backoff to wait for another 1 second", - stopwatch.elapsed(TimeUnit.MILLISECONDS), numMessages); + log.info("{} ms has elapsed but the sink {} hasn't process {} messages, backoff to wait for another 1 second", + stopwatch.elapsed(TimeUnit.MILLISECONDS), sinkName, numMessages); TimeUnit.SECONDS.sleep(1); } } diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java index fe4795d..7a47f77 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/functions/PulsarFunctionsTestBase.java @@ -52,7 +52,7 @@ public abstract class PulsarFunctionsTestBase extends PulsarTestSuite { @BeforeClass public void setupFunctionWorkers() { - final int numFunctionWorkers = 2; + final int numFunctionWorkers = 3; log.info("Setting up {} function workers : function runtime type = {}", numFunctionWorkers, functionRuntimeType); pulsarCluster.setupFunctionWorkers(randomName(5), functionRuntimeType, numFunctionWorkers); diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java new file mode 100644 index 0000000..6a102f1 --- /dev/null +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/io/JdbcSinkTester.java @@ -0,0 +1,137 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.tests.integration.io; + +import static com.google.common.base.Preconditions.checkState; +import static org.testng.Assert.assertEquals; +import static org.testng.Assert.fail; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.util.Map; +import lombok.Data; +import lombok.EqualsAndHashCode; +import lombok.ToString; +import lombok.extern.slf4j.Slf4j; +import org.apache.pulsar.client.impl.schema.AvroSchema; +import org.testcontainers.containers.GenericContainer; +import org.testcontainers.containers.MySQLContainer; + +/** + * A tester for testing jdbc sink. + * This will use MySql as DB server + */ +@Slf4j +public class JdbcSinkTester extends SinkTester { + + /** + * A Simple class to test jdbc class, + * + */ + @Data + @ToString + @EqualsAndHashCode + public static class Foo { + private String field1; + private String field2; + private int field3; + } + + private static final String NAME = "jdbc"; + + private MySQLContainer mySQLContainer; + private AvroSchema<Foo> schema = AvroSchema.of(Foo.class); + private String tableName = "test"; + private Connection connection; + + public JdbcSinkTester() { + super(NAME); + + // container default value is test + sinkConfig.put("userName", "test"); + sinkConfig.put("password", "test"); + sinkConfig.put("tableName", tableName); + + // prepare schema + sinkConfig.put("schema", new String(schema.getSchemaInfo().getSchema())); + log.info("schema: {}", new String(schema.getSchemaInfo().getSchema())); + sinkConfig.put("batchSize", 1); + } + + @Override + public void findSinkServiceContainer(Map<String, GenericContainer<?>> containers) { + GenericContainer<?> container = containers.get("mysql"); + checkState(container instanceof MySQLContainer, + "No MySQL service found in the cluster"); + + this.mySQLContainer = (MySQLContainer) container; + log.info("find sink service container: {}", mySQLContainer.getContainerName()); + } + + @Override + public void prepareSink() throws Exception { + String jdbcUrl = mySQLContainer.getJdbcUrl(); + // we need set mysql server address in cluster network. + sinkConfig.put("jdbcUrl", "jdbc:mysql://mysql:3306/test"); + String driver = mySQLContainer.getDriverClassName(); + Class.forName(driver); + + connection = DriverManager.getConnection(jdbcUrl, "test", "test"); + log.info("getConnection: {}, jdbcurl: {}", connection, jdbcUrl); + + // create table + String createTable = "CREATE TABLE " + tableName + + " (field1 TEXT, field2 TEXT, field3 INTEGER, PRIMARY KEY (field3))"; + int ret = connection.createStatement().executeUpdate(createTable); + log.info("created table in jdbc: {}, return value: {}", createTable, ret); + } + + @Override + public void validateSinkResult(Map<String, String> kvs) { + log.info("Query table content from mysql server: {}", tableName); + String querySql = "SELECT * FROM " + tableName; + ResultSet rs; + try { + // backend flush may not complete. + Thread.sleep(1000); + + PreparedStatement statement = connection.prepareStatement(querySql); + rs = statement.executeQuery(); + + while (rs.next()) { + String field1 = rs.getString(1); + String field2 = rs.getString(2); + int field3 = rs.getInt(3); + + String value = kvs.get("key-" + field3); + + Foo obj = schema.decode(value.getBytes()); + assertEquals(obj.field1, field1); + assertEquals(obj.field2, field2); + assertEquals(obj.field3, field3); + } + } catch (Exception e) { + log.error("Got exception: ", e); + fail("Got exception when op sql."); + return; + } + } +} diff --git a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java index e20a933..147f273 100644 --- a/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java +++ b/tests/integration/src/test/java/org/apache/pulsar/tests/integration/suites/PulsarTestSuite.java @@ -24,6 +24,7 @@ import org.apache.pulsar.tests.integration.topologies.PulsarClusterSpec.PulsarCl import org.apache.pulsar.tests.integration.topologies.PulsarClusterTestBase; import org.testcontainers.containers.GenericContainer; import org.testcontainers.containers.KafkaContainer; +import org.testcontainers.containers.MySQLContainer; import org.testng.ITest; import org.testng.annotations.AfterSuite; import org.testng.annotations.BeforeSuite; @@ -51,6 +52,7 @@ public class PulsarTestSuite extends PulsarClusterTestBase implements ITest { // register external services Map<String, GenericContainer<?>> externalServices = Maps.newHashMap(); + final String kafkaServiceName = "kafka"; externalServices.put( kafkaServiceName, @@ -60,10 +62,19 @@ public class PulsarTestSuite extends PulsarClusterTestBase implements ITest { .withCreateContainerCmdModifier(createContainerCmd -> createContainerCmd .withName(kafkaServiceName) .withHostName(clusterName + "-" + kafkaServiceName))); + final String cassandraServiceName = "cassandra"; externalServices.put( cassandraServiceName, new CassandraContainer(clusterName)); + + // use mySQL for jdbc test + final String jdbcServiceName = "mysql"; + externalServices.put( + jdbcServiceName, + new MySQLContainer() + .withExposedPorts(3306)); + builder = builder.externalServices(externalServices); return builder;