EricJoy2048 commented on code in PR #6808:
URL: https://github.com/apache/seatunnel/pull/6808#discussion_r1595230998
##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java:
##########
@@ -97,11 +102,26 @@ public void close() {
}
@Override
- public void pollNext(Collector<SeaTunnelRow> output) {
- while (!pendingSplits.isEmpty()) {
- synchronized (output.getCheckpointLock()) {
- InfluxDBSourceSplit split = pendingSplits.poll();
- read(split, output);
+ public void pollNext(Collector<SeaTunnelRow> output) throws
InterruptedException {
+ // reader influxDB By chunk
+ if (StringUtils.isEmpty(config.getSplitKey()) && config.getChunkSize()
> 0) {
Review Comment:
1. Even if users set split_key, they can still use Chunk's read mode. Can
this be understood as follows?
2. I think using Chunk mode to read should be a better way. A better
approach is to give a default value if the user has not set chunk_size, and
then use the `readBychunkSize` method uniformly to read the data.
##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java:
##########
@@ -151,4 +171,49 @@ private void read(InfluxDBSourceSplit split,
Collector<SeaTunnelRow> output) {
}
}
}
+
+ private void readByChunkSize(InfluxDBSourceSplit split,
Collector<SeaTunnelRow> output) {
+ influxdb.query(
+ new Query(split.getQuery(), config.getDatabase()),
+ config.getChunkSize(),
+ (cancellable, queryResult) -> {
+ if (cancellable.isCanceled()) {
+ log.info("this chunk reader influxDB is canceled");
+ latch.countDown();
+ return;
+ }
+ if (queryResult.hasError()) {
+ log.error(
+ "this chunk reader influxDB result has error
[{}]",
+ queryResult.getError());
+ latch.countDown();
+ return;
+ }
+ for (QueryResult.Result result : queryResult.getResults())
{
+ List<QueryResult.Series> serieList =
result.getSeries();
+ if (CollectionUtils.isNotEmpty(serieList)) {
+ for (QueryResult.Series series : serieList) {
+ for (List<Object> values : series.getValues())
{
+ SeaTunnelRow row =
+ InfluxDBRowConverter.convert(
+ values, seaTunnelRowType,
columnsIndexList);
+ output.collect(row);
+ }
+ }
+ } else {
+ log.info("this chunk reader influxDB series is
empty");
+ }
+ }
+ },
+ () -> {
+ log.error("this chunk reader influxDB complete");
Review Comment:
log.error?
##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java:
##########
@@ -151,4 +171,49 @@ private void read(InfluxDBSourceSplit split,
Collector<SeaTunnelRow> output) {
}
}
}
+
+ private void readByChunkSize(InfluxDBSourceSplit split,
Collector<SeaTunnelRow> output) {
+ influxdb.query(
+ new Query(split.getQuery(), config.getDatabase()),
Review Comment:
Use `CountDownLatch` is not a good idea. You can update the code like this:
(Then method `readByChunkSize` will become a sync methods and will not return
until the reading thread is completed )
```
final CompletableFuture<Void> queryCompleteFeature = new
CompletableFuture<>();
influxdb.query(
new Query(split.getQuery(), config.getDatabase()),
config.getChunkSize(),
(cancellable, queryResult) -> {
if (cancellable.isCanceled()) {
log.info("this chunk reader influxDB is canceled");
queryCompleteFeature.complete();
return;
}
if (queryResult.hasError()) {
log.error(
"this chunk reader influxDB result has error
[{}]",
queryResult.getError());
queryCompleteFeature.completeExceptionally(new
InfluxdbConnectorException)
return;
}
for (QueryResult.Result result :
queryResult.getResults()) {
List<QueryResult.Series> serieList =
result.getSeries();
if (CollectionUtils.isNotEmpty(serieList)) {
for (QueryResult.Series series : serieList) {
for (List<Object> values :
series.getValues()) {
SeaTunnelRow row =
InfluxDBRowConverter.convert(
values,
seaTunnelRowType, columnsIndexList);
output.collect(row);
}
}
} else {
log.info("this chunk reader influxDB series is
empty");
}
}
},
() -> {
log.info("this chunk reader influxDB complete");
queryCompleteFeature.complete();
},
throwable -> {
log.error(
"this chunk reader influxDB result has error
[{}]",
throwable.getMessage());
queryCompleteFeature.completeExceptionally(new
InfluxdbConnectorException(throwable))
});
}
queryCompleteFeature.get();
```
##########
docs/en/connector-v2/source/InfluxDB.md:
##########
@@ -6,17 +6,48 @@
Read external data source data through InfluxDB.
+## Support InfluxDB Version
+
+- 1.x/2.x
+
+## Support Those Engines
+
+> Spark<br/>
+> Flink<br/>
+> SeaTunnel Zeta<br/>
+
+## Using Dependency
+
+### For Spark/Flink Engine
+
+> 1. You need to ensure that the [influxDB connector jar
package](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-influxdb)
has been placed in directory `${SEATUNNEL_HOME}/connectors/`.
+
+### For SeaTunnel Zeta Engine
+
+> 1. You need to ensure that the [influxDB connector jar
package](https://mvnrepository.com/artifact/org.apache.seatunnel/connector-influxdb)
has been placed in directory `${SEATUNNEL_HOME}/lib/`.
+
## Key features
- [x] [batch](../../concept/connector-v2-features.md)
- [ ] [stream](../../concept/connector-v2-features.md)
- [x] [exactly-once](../../concept/connector-v2-features.md)
- [x] [column projection](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
+- [ ] [support multiple table reading](../../concept/connector-v2-features.md)
+- [x] [parallelism](../../concept/connector-v2-features.md)
Review Comment:
<img width="410" alt="image"
src="https://github.com/apache/seatunnel/assets/32193458/d09cf4bd-884e-42fb-b25e-21d1812f669a">
Repeated
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]