EricJoy2048 commented on code in PR #6808:
URL: https://github.com/apache/seatunnel/pull/6808#discussion_r1595230998


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java:
##########
@@ -97,11 +102,26 @@ public void close() {
     }
 
     @Override
-    public void pollNext(Collector<SeaTunnelRow> output) {
-        while (!pendingSplits.isEmpty()) {
-            synchronized (output.getCheckpointLock()) {
-                InfluxDBSourceSplit split = pendingSplits.poll();
-                read(split, output);
+    public void pollNext(Collector<SeaTunnelRow> output) throws 
InterruptedException {
+        // reader influxDB By chunk
+        if (StringUtils.isEmpty(config.getSplitKey()) && config.getChunkSize() 
> 0) {

Review Comment:
   1. Even if users set split_key, they can still use Chunk's read mode. Can 
this be understood as follows?
   2. I think using Chunk mode to read should be a better way. A better 
approach is to give a default value if the user has not set chunk_size, and 
then use the `readBychunkSize` method uniformly to read the data.



##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java:
##########
@@ -151,4 +171,49 @@ private void read(InfluxDBSourceSplit split, 
Collector<SeaTunnelRow> output) {
             }
         }
     }
+
+    private void readByChunkSize(InfluxDBSourceSplit split, 
Collector<SeaTunnelRow> output) {
+        influxdb.query(
+                new Query(split.getQuery(), config.getDatabase()),
+                config.getChunkSize(),
+                (cancellable, queryResult) -> {
+                    if (cancellable.isCanceled()) {
+                        log.info("this chunk reader influxDB is canceled");
+                        latch.countDown();
+                        return;
+                    }
+                    if (queryResult.hasError()) {
+                        log.error(
+                                "this chunk reader influxDB result has error 
[{}]",
+                                queryResult.getError());
+                        latch.countDown();
+                        return;
+                    }
+                    for (QueryResult.Result result : queryResult.getResults()) 
{
+                        List<QueryResult.Series> serieList = 
result.getSeries();
+                        if (CollectionUtils.isNotEmpty(serieList)) {
+                            for (QueryResult.Series series : serieList) {
+                                for (List<Object> values : series.getValues()) 
{
+                                    SeaTunnelRow row =
+                                            InfluxDBRowConverter.convert(
+                                                    values, seaTunnelRowType, 
columnsIndexList);
+                                    output.collect(row);
+                                }
+                            }
+                        } else {
+                            log.info("this chunk reader influxDB series is 
empty");
+                        }
+                    }
+                },
+                () -> {
+                    log.error("this chunk reader influxDB complete");

Review Comment:
   log.error?



##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java:
##########
@@ -151,4 +171,49 @@ private void read(InfluxDBSourceSplit split, 
Collector<SeaTunnelRow> output) {
             }
         }
     }
+
+    private void readByChunkSize(InfluxDBSourceSplit split, 
Collector<SeaTunnelRow> output) {
+        influxdb.query(
+                new Query(split.getQuery(), config.getDatabase()),

Review Comment:
   Use `CountDownLatch` is not a good idea. You can update the code like this: 
(Then method `readByChunkSize` will become a sync methods and will not return 
until the reading thread is completed )
   
   ```
   final CompletableFuture<Void> queryCompleteFeature = new 
CompletableFuture<>();
   
   influxdb.query(
                   new Query(split.getQuery(), config.getDatabase()),
   config.getChunkSize(),
                   (cancellable, queryResult) -> {
                       if (cancellable.isCanceled()) {
                           log.info("this chunk reader influxDB is canceled");
                           queryCompleteFeature.complete();
                           return;
                       }
                       if (queryResult.hasError()) {
                           log.error(
                                   "this chunk reader influxDB result has error 
[{}]",
                                   queryResult.getError());
                           queryCompleteFeature.completeExceptionally(new 
InfluxdbConnectorException)
                           return;
                       }
                       for (QueryResult.Result result : 
queryResult.getResults()) {
                           List<QueryResult.Series> serieList = 
result.getSeries();
                           if (CollectionUtils.isNotEmpty(serieList)) {
                               for (QueryResult.Series series : serieList) {
                                   for (List<Object> values : 
series.getValues()) {
                                       SeaTunnelRow row =
                                               InfluxDBRowConverter.convert(
                                                       values, 
seaTunnelRowType, columnsIndexList);
                                       output.collect(row);
                                   }
                               }
                           } else {
                               log.info("this chunk reader influxDB series is 
empty");
                           }
                       }
                   },
                   () -> {
                       log.info("this chunk reader influxDB complete");
                       queryCompleteFeature.complete();
                   },
                   throwable -> {
                       log.error(
                               "this chunk reader influxDB result has error 
[{}]",
                               throwable.getMessage());
                       queryCompleteFeature.completeExceptionally(new 
InfluxdbConnectorException(throwable))
                   });
       }
   
      queryCompleteFeature.get();
   ```



##########
docs/en/connector-v2/source/InfluxDB.md:
##########
@@ -6,17 +6,48 @@
 
 Read external data source data through InfluxDB.
 
+## Support InfluxDB Version
+
+- 1.x/2.x
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Using Dependency
+
+### For Spark/Flink Engine
+
+> 1. You need to ensure that the [influxDB connector jar 
package](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-influxdb)
 has been placed in directory `${SEATUNNEL_HOME}/connectors/`.
+
+### For SeaTunnel Zeta Engine
+
+> 1. You need to ensure that the [influxDB connector jar 
package](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-influxdb)
 has been placed in directory `${SEATUNNEL_HOME}/lib/`.
+
 ## Key features
 
 - [x] [batch](../../concept/connector-v2-features.md)
 - [ ] [stream](../../concept/connector-v2-features.md)
 - [x] [exactly-once](../../concept/connector-v2-features.md)
 - [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support multiple table reading](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)

Review Comment:
   <img width="410" alt="image" 
src="https://github.com/apache/seatunnel/assets/32193458/d09cf4bd-884e-42fb-b25e-21d1812f669a";>
   
   Repeated



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to