This is an automated email from the ASF dual-hosted git repository.
gaojun2048 pushed a commit to branch dev
in repository https://gitbox.apache.org/repos/asf/incubator-seatunnel.git
The following commit(s) were added to refs/heads/dev by this push:
new 60ecc6428 [Feature][Connector-V2][GoogleSheets] Support GoogleSheets
Source (#3185)
60ecc6428 is described below
commit 60ecc6428bbf9fd582ff9096f7d132d79ac32398
Author: skyoct <[email protected]>
AuthorDate: Wed Nov 9 22:06:34 2022 +0800
[Feature][Connector-V2][GoogleSheets] Support GoogleSheets Source (#3185)
---
docs/en/connector-v2/source/GoogleSheets.md | 74 +++++++++++++++
plugin-mapping.properties | 3 +-
.../connector-google-sheets/pom.xml | 94 ++++++++++++++++++
.../google/sheets/config/SheetsConfig.java | 26 +++++
.../google/sheets/config/SheetsParameters.java | 45 +++++++++
.../deserialize/GoogleSheetsDeserializer.java | 56 +++++++++++
.../deserialize/SeaTunnelRowDeserializer.java | 28 ++++++
.../google/sheets/source/SheetsSource.java | 86 +++++++++++++++++
.../google/sheets/source/SheetsSourceReader.java | 105 +++++++++++++++++++++
seatunnel-connectors-v2/pom.xml | 1 +
seatunnel-dist/pom.xml | 6 ++
11 files changed, 523 insertions(+), 1 deletion(-)
diff --git a/docs/en/connector-v2/source/GoogleSheets.md
b/docs/en/connector-v2/source/GoogleSheets.md
new file mode 100644
index 000000000..ba260c7fb
--- /dev/null
+++ b/docs/en/connector-v2/source/GoogleSheets.md
@@ -0,0 +1,74 @@
+# GoogleSheets
+
+> GoogleSheets source connector
+
+## Description
+
+Used to read data from GoogleSheets.
+
+## Key features
+
+- [x] [batch](../../concept/connector-v2-features.md)
+- [ ] [stream](../../concept/connector-v2-features.md)
+- [ ] [exactly-once](../../concept/connector-v2-features.md)
+- [x] [schema projection](../../concept/connector-v2-features.md)
+- [ ] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support user-defined split](../../concept/connector-v2-features.md)
+- [ ] file format
+ - [ ] text
+ - [ ] csv
+ - [ ] json
+
+## Options
+
+| name | type | required | default value |
+|------------------- |--------------|----------|---------------|
+| service_account_key | string | yes | - |
+| sheet_id | string | yes | - |
+| sheet_name | string | yes | - |
+| schema | config | yes | - |
+
+### service_account_key [string]
+
+google cloud service account, base64 required
+
+### sheet_id [string]
+
+sheet id in a Google Sheets URL
+
+### sheet_name [string]
+
+the name of the sheet you want to import
+
+### schema [config]
+
+#### fields [config]
+
+the schema fields of upstream data
+
+## Example
+
+simple:
+
+```hocon
+ GoogleSheets {
+ service_account_key = "seatunnel-test"
+ sheet_id = "1VI0DvyZK-NIdssSdsDSsSSSC-_-rYMi7ppJiI_jhE"
+ sheet_name = "sheets01"
+ range = "A1:C3"
+ schema = {
+ fields {
+ a = int
+ b = string
+ c = string
+ }
+ }
+ }
+```
+
+
+## Changelog
+
+### next version
+
+- Add GoogleSheets Source Connector
diff --git a/plugin-mapping.properties b/plugin-mapping.properties
index 1e86da9ae..6863ef37e 100644
--- a/plugin-mapping.properties
+++ b/plugin-mapping.properties
@@ -143,4 +143,5 @@ seatunnel.source.Cassandra = connector-cassandra
seatunnel.sink.Cassandra = connector-cassandra
seatunnel.sink.StarRocks = connector-starrocks
seatunnel.source.MyHours = connector-http-myhours
-seatunnel.sink.InfluxDB = connector-influxdb
\ No newline at end of file
+seatunnel.sink.InfluxDB = connector-influxdb
+seatunnel.source.GoogleSheets = connector-google-sheets
\ No newline at end of file
diff --git a/seatunnel-connectors-v2/connector-google-sheets/pom.xml
b/seatunnel-connectors-v2/connector-google-sheets/pom.xml
new file mode 100644
index 000000000..30c40d40a
--- /dev/null
+++ b/seatunnel-connectors-v2/connector-google-sheets/pom.xml
@@ -0,0 +1,94 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+
+-->
+<project xmlns="http://maven.apache.org/POM/4.0.0"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <parent>
+ <artifactId>seatunnel-connectors-v2</artifactId>
+ <groupId>org.apache.seatunnel</groupId>
+ <version>${revision}</version>
+ </parent>
+ <modelVersion>4.0.0</modelVersion>
+
+ <artifactId>connector-google-sheets</artifactId>
+
+ <dependencies>
+
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>seatunnel-format-json</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.apis</groupId>
+ <artifactId>google-api-services-sheets</artifactId>
+ <version>v4-rev612-1.25.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.auth</groupId>
+ <artifactId>google-auth-library-oauth2-http</artifactId>
+ <version>1.3.0</version>
+ </dependency>
+ <dependency>
+ <groupId>com.google.guava</groupId>
+ <artifactId>guava</artifactId>
+ <version>31.1-android</version>
+ </dependency>
+
+ </dependencies>
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-shade-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>package</phase>
+ <goals>
+ <goal>shade</goal>
+ </goals>
+ <configuration>
+ <relocations>
+ <relocation>
+ <pattern>com.google.common</pattern>
+
<shadedPattern>${seatunnel.shade.package}.google.sheets.com.google.common</shadedPattern>
+ </relocation>
+ </relocations>
+ <filters>
+ <filter>
+ <artifact>*:*</artifact>
+ <excludes>
+ <exclude>META-INF/maven/**</exclude>
+ </excludes>
+ </filter>
+ </filters>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
\ No newline at end of file
diff --git
a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsConfig.java
b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsConfig.java
new file mode 100644
index 000000000..8efda56a6
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsConfig.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.google.sheets.config;
+
+public class SheetsConfig {
+
+ public static final String SERVICE_ACCOUNT_KEY = "service_account_key";
+ public static final String SHEET_ID = "sheet_id";
+ public static final String SHEET_NAME = "sheet_name";
+ public static final String RANGE = "range";
+}
diff --git
a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsParameters.java
b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsParameters.java
new file mode 100644
index 000000000..250d43e7e
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/config/SheetsParameters.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.google.sheets.config;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import lombok.Data;
+
+import java.io.Serializable;
+
+@Data
+public class SheetsParameters implements Serializable {
+
+ private byte[] serviceAccountKey;
+
+ private String sheetId;
+
+ private String sheetName;
+
+ private String range;
+
+ public SheetsParameters buildWithConfig(Config config) {
+ this.serviceAccountKey =
config.getString(SheetsConfig.SERVICE_ACCOUNT_KEY).getBytes();
+ this.sheetId = config.getString(SheetsConfig.SHEET_ID);
+ this.sheetName = config.getString(SheetsConfig.SHEET_NAME);
+ this.range = config.getString(SheetsConfig.RANGE);
+ return this;
+ }
+
+}
diff --git
a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/GoogleSheetsDeserializer.java
b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/GoogleSheetsDeserializer.java
new file mode 100644
index 000000000..d86e45969
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/GoogleSheetsDeserializer.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.google.sheets.deserialize;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class GoogleSheetsDeserializer implements SeaTunnelRowDeserializer {
+
+ private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+ private final ObjectMapper objectMapper = new ObjectMapper();
+ private final String[] fields;
+
+ public GoogleSheetsDeserializer(String[] fields,
DeserializationSchema<SeaTunnelRow> deserializationSchema) {
+ this.fields = fields;
+ this.deserializationSchema = deserializationSchema;
+ }
+
+ @Override
+ public SeaTunnelRow deserializeRow(List<Object> row) {
+ try {
+ Map<String, Object> map = new HashMap<>();
+ for (int i = 0; i < row.size(); i++) {
+ if (i < fields.length) {
+ map.put(fields[i], row.get(i));
+ }
+ }
+ String rowStr = objectMapper.writeValueAsString(map);
+ return deserializationSchema.deserialize(rowStr.getBytes());
+ } catch (IOException e) {
+ throw new RuntimeException("Object json deserialization
exception.", e);
+ }
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/SeaTunnelRowDeserializer.java
b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/SeaTunnelRowDeserializer.java
new file mode 100644
index 000000000..70f271664
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/deserialize/SeaTunnelRowDeserializer.java
@@ -0,0 +1,28 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.google.sheets.deserialize;
+
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+
+import java.util.List;
+
+public interface SeaTunnelRowDeserializer {
+
+ SeaTunnelRow deserializeRow(List<Object> row);
+
+}
diff --git
a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/source/SheetsSource.java
b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/source/SheetsSource.java
new file mode 100644
index 000000000..90df48445
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/source/SheetsSource.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.google.sheets.source;
+
+import org.apache.seatunnel.api.common.PrepareFailException;
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Boundedness;
+import org.apache.seatunnel.api.source.SeaTunnelSource;
+import org.apache.seatunnel.api.table.type.SeaTunnelDataType;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import org.apache.seatunnel.common.config.CheckConfigUtil;
+import org.apache.seatunnel.common.config.CheckResult;
+import org.apache.seatunnel.common.constants.PluginType;
+import org.apache.seatunnel.connectors.seatunnel.common.schema.SeaTunnelSchema;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitSource;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import
org.apache.seatunnel.connectors.seatunnel.google.sheets.config.SheetsConfig;
+import
org.apache.seatunnel.connectors.seatunnel.google.sheets.config.SheetsParameters;
+import org.apache.seatunnel.format.json.JsonDeserializationSchema;
+
+import org.apache.seatunnel.shade.com.typesafe.config.Config;
+
+import com.google.auto.service.AutoService;
+
+@AutoService(SeaTunnelSource.class)
+public class SheetsSource extends AbstractSingleSplitSource<SeaTunnelRow> {
+
+ private SeaTunnelRowType seaTunnelRowType;
+
+ private SheetsParameters sheetsParameters;
+
+ private DeserializationSchema<SeaTunnelRow> deserializationSchema;
+
+ @Override
+ public String getPluginName() {
+ return "GoogleSheets";
+ }
+
+ @Override
+ public void prepare(Config pluginConfig) throws PrepareFailException {
+ CheckResult checkResult = CheckConfigUtil.checkAllExists(pluginConfig,
SheetsConfig.SERVICE_ACCOUNT_KEY, SheetsConfig.SHEET_ID,
SheetsConfig.SHEET_NAME, SheetsConfig.RANGE, SeaTunnelSchema.SCHEMA);
+ if (!checkResult.isSuccess()) {
+ throw new PrepareFailException(getPluginName(), PluginType.SOURCE,
checkResult.getMsg());
+ }
+ this.sheetsParameters = new
SheetsParameters().buildWithConfig(pluginConfig);
+ if (pluginConfig.hasPath(SeaTunnelSchema.SCHEMA)) {
+ Config schema = pluginConfig.getConfig(SeaTunnelSchema.SCHEMA);
+ this.seaTunnelRowType =
SeaTunnelSchema.buildWithConfig(schema).getSeaTunnelRowType();
+ } else {
+ this.seaTunnelRowType = SeaTunnelSchema.buildSimpleTextSchema();
+ }
+ this.deserializationSchema = new JsonDeserializationSchema(false,
false, seaTunnelRowType);
+ }
+
+ @Override
+ public Boundedness getBoundedness() {
+ return Boundedness.BOUNDED;
+ }
+
+ @Override
+ public SeaTunnelDataType<SeaTunnelRow> getProducedType() {
+ return seaTunnelRowType;
+ }
+
+ @Override
+ public AbstractSingleSplitReader<SeaTunnelRow>
createReader(SingleSplitReaderContext readerContext) throws Exception {
+ return new SheetsSourceReader(sheetsParameters, readerContext,
deserializationSchema, this.seaTunnelRowType);
+ }
+}
diff --git
a/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/source/SheetsSourceReader.java
b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/source/SheetsSourceReader.java
new file mode 100644
index 000000000..47e96e955
--- /dev/null
+++
b/seatunnel-connectors-v2/connector-google-sheets/src/main/java/org/apache/seatunnel/connectors/seatunnel/google/sheets/source/SheetsSourceReader.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.seatunnel.connectors.seatunnel.google.sheets.source;
+
+import org.apache.seatunnel.api.serialization.DeserializationSchema;
+import org.apache.seatunnel.api.source.Collector;
+import org.apache.seatunnel.api.table.type.SeaTunnelRow;
+import org.apache.seatunnel.api.table.type.SeaTunnelRowType;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.AbstractSingleSplitReader;
+import
org.apache.seatunnel.connectors.seatunnel.common.source.SingleSplitReaderContext;
+import
org.apache.seatunnel.connectors.seatunnel.google.sheets.config.SheetsParameters;
+import
org.apache.seatunnel.connectors.seatunnel.google.sheets.deserialize.GoogleSheetsDeserializer;
+import
org.apache.seatunnel.connectors.seatunnel.google.sheets.deserialize.SeaTunnelRowDeserializer;
+
+import com.google.api.client.googleapis.javanet.GoogleNetHttpTransport;
+import com.google.api.client.http.HttpRequestInitializer;
+import com.google.api.client.http.javanet.NetHttpTransport;
+import com.google.api.client.json.JsonFactory;
+import com.google.api.client.json.gson.GsonFactory;
+import com.google.api.services.sheets.v4.Sheets;
+import com.google.api.services.sheets.v4.SheetsScopes;
+import com.google.api.services.sheets.v4.model.ValueRange;
+import com.google.auth.http.HttpCredentialsAdapter;
+import com.google.auth.oauth2.ServiceAccountCredentials;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.util.Base64;
+import java.util.Collections;
+import java.util.List;
+
+public class SheetsSourceReader extends
AbstractSingleSplitReader<SeaTunnelRow> {
+
+ private SheetsParameters sheetsParameters;
+
+ private SeaTunnelRowType seaTunnelRowType;
+
+ private HttpRequestInitializer requestInitializer;
+
+ private static final String APPLICATION_NAME = "SeaTunnel Google Sheets";
+
+ private static final JsonFactory JSON_FACTORY =
GsonFactory.getDefaultInstance();
+
+ private final SingleSplitReaderContext context;
+
+
+ private final SeaTunnelRowDeserializer seaTunnelRowDeserializer;
+
+ public SheetsSourceReader(SheetsParameters sheetsParameters,
SingleSplitReaderContext context, DeserializationSchema<SeaTunnelRow>
deserializationSchema, SeaTunnelRowType seaTunnelRowType) throws IOException {
+ this.sheetsParameters = sheetsParameters;
+ this.context = context;
+ this.seaTunnelRowType = seaTunnelRowType;
+ this.seaTunnelRowDeserializer = new
GoogleSheetsDeserializer(seaTunnelRowType.getFieldNames(),
deserializationSchema);
+ }
+
+ @Override
+ public void open() throws Exception {
+ byte[] keyBytes =
Base64.getDecoder().decode(sheetsParameters.getServiceAccountKey());
+ ServiceAccountCredentials sourceCredentials = ServiceAccountCredentials
+ .fromStream(new ByteArrayInputStream(keyBytes));
+ sourceCredentials = (ServiceAccountCredentials) sourceCredentials
+
.createScoped(Collections.singletonList(SheetsScopes.SPREADSHEETS));
+ requestInitializer = new HttpCredentialsAdapter(sourceCredentials);
+
+ }
+
+ @Override
+ public void close() throws IOException {
+ // no need close
+ }
+
+ @Override
+ public void pollNext(Collector<SeaTunnelRow> output) throws Exception {
+ final NetHttpTransport httpTransport =
GoogleNetHttpTransport.newTrustedTransport();
+ Sheets service = new Sheets.Builder(httpTransport, JSON_FACTORY,
requestInitializer)
+ .setApplicationName(APPLICATION_NAME)
+ .build();
+ ValueRange response = service.spreadsheets().values()
+ .get(sheetsParameters.getSheetId(),
sheetsParameters.getSheetName() + "!" + sheetsParameters.getRange())
+ .execute();
+ List<List<Object>> values = response.getValues();
+ if (values != null) {
+ for (List<Object> row : values) {
+ SeaTunnelRow seaTunnelRow =
this.seaTunnelRowDeserializer.deserializeRow(row);
+ output.collect(seaTunnelRow);
+ }
+ }
+ this.context.signalNoMoreElement();
+ }
+}
diff --git a/seatunnel-connectors-v2/pom.xml b/seatunnel-connectors-v2/pom.xml
index 267ead164..28ac5a803 100644
--- a/seatunnel-connectors-v2/pom.xml
+++ b/seatunnel-connectors-v2/pom.xml
@@ -59,6 +59,7 @@
<module>connector-amazondynamodb</module>
<module>connector-cassandra</module>
<module>connector-starrocks</module>
+ <module>connector-google-sheets</module>
</modules>
<dependencyManagement>
diff --git a/seatunnel-dist/pom.xml b/seatunnel-dist/pom.xml
index 3cf94f3b1..e6cbc4afb 100644
--- a/seatunnel-dist/pom.xml
+++ b/seatunnel-dist/pom.xml
@@ -267,6 +267,12 @@
<version>${project.version}</version>
<scope>provided</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.seatunnel</groupId>
+ <artifactId>connector-google-sheets</artifactId>
+ <version>${project.version}</version>
+ <scope>provided</scope>
+ </dependency>
<dependency>
<groupId>org.apache.seatunnel</groupId>
<artifactId>connector-datahub</artifactId>