15767714253 commented on code in PR #6808:
URL: https://github.com/apache/seatunnel/pull/6808#discussion_r1595250022


##########
seatunnel-connectors-v2/connector-influxdb/src/main/java/org/apache/seatunnel/connectors/seatunnel/influxdb/source/InfluxdbSourceReader.java:
##########
@@ -151,4 +171,49 @@ private void read(InfluxDBSourceSplit split, 
Collector<SeaTunnelRow> output) {
             }
         }
     }
+
+    private void readByChunkSize(InfluxDBSourceSplit split, 
Collector<SeaTunnelRow> output) {
+        influxdb.query(
+                new Query(split.getQuery(), config.getDatabase()),
+                config.getChunkSize(),
+                (cancellable, queryResult) -> {
+                    if (cancellable.isCanceled()) {
+                        log.info("this chunk reader influxDB is canceled");
+                        latch.countDown();
+                        return;
+                    }
+                    if (queryResult.hasError()) {
+                        log.error(
+                                "this chunk reader influxDB result has error 
[{}]",
+                                queryResult.getError());
+                        latch.countDown();
+                        return;
+                    }
+                    for (QueryResult.Result result : queryResult.getResults()) 
{
+                        List<QueryResult.Series> serieList = 
result.getSeries();
+                        if (CollectionUtils.isNotEmpty(serieList)) {
+                            for (QueryResult.Series series : serieList) {
+                                for (List<Object> values : series.getValues()) 
{
+                                    SeaTunnelRow row =
+                                            InfluxDBRowConverter.convert(
+                                                    values, seaTunnelRowType, 
columnsIndexList);
+                                    output.collect(row);
+                                }
+                            }
+                        } else {
+                            log.info("this chunk reader influxDB series is 
empty");
+                        }
+                    }
+                },
+                () -> {
+                    log.error("this chunk reader influxDB complete");

Review Comment:
   log.info    



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to