This is an automated email from the ASF dual-hosted git repository.

eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git

commit c8b6f6122ae6411064a1ec55cf2c2fbc08d4979f
Author: dave <[email protected]>
AuthorDate: Thu Oct 7 11:58:28 2021 -0400

    [BAHIR-283] Fix dropped elements on InfluxDbSink
---
 .../connectors/influxdb/sink/writer/InfluxDBWriter.java       | 11 +++++------
 .../connectors/influxdb/InfluxDBSinkIntegrationTestCase.java  |  6 +++++-
 2 files changed, 10 insertions(+), 7 deletions(-)

diff --git 
a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
 
b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
index e800c86..5826c5c 100644
--- 
a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
+++ 
b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
@@ -80,16 +80,15 @@ public final class InfluxDBWriter<IN> implements 
SinkWriter<IN, Long, Point> {
      */
     @Override
     public void write(final IN in, final Context context) throws IOException {
+        LOG.trace("Adding elements to buffer. Buffer size: {}", 
this.elements.size());
+        this.elements.add(this.schemaSerializer.serialize(in, context));
         if (this.elements.size() == this.bufferSize) {
             LOG.debug("Buffer size reached preparing to write the elements.");
             this.writeCurrentElements();
             this.elements.clear();
-        } else {
-            LOG.trace("Adding elements to buffer. Buffer size: {}", 
this.elements.size());
-            this.elements.add(this.schemaSerializer.serialize(in, context));
-            if (context.timestamp() != null) {
-                this.lastTimestamp = Math.max(this.lastTimestamp, 
context.timestamp());
-            }
+        }
+        if (context.timestamp() != null) {
+            this.lastTimestamp = Math.max(this.lastTimestamp, 
context.timestamp());
         }
     }
 
diff --git 
a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
 
b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
index 75671c8..138b017 100644
--- 
a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
+++ 
b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
@@ -31,6 +31,8 @@ import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
 import org.apache.flink.api.common.RuntimeExecutionMode;
 import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
 import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -52,8 +54,9 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
 
     private static final List<Long> SOURCE_DATA = Arrays.asList(1L, 2L, 3L);
 
+    // FiniteTestSource emits list of elements twice
     private static final List<String> 
EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE =
-            SOURCE_DATA.stream()
+            Stream.concat(SOURCE_DATA.stream(), SOURCE_DATA.stream())
                     .map(x -> new InfluxDBTestSerializer().serialize(x, 
null).toLineProtocol())
                     .collect(Collectors.toList());
 
@@ -82,6 +85,7 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
                         .setInfluxDBPassword(InfluxDBContainer.password)
                         .setInfluxDBBucket(InfluxDBContainer.bucket)
                         
.setInfluxDBOrganization(InfluxDBContainer.organization)
+                        .setWriteBufferSize(2)
                         .addCheckpointDataPoint(true)
                         .build();
 

Reply via email to