This is an automated email from the ASF dual-hosted git repository. eskabetxe pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
commit c8b6f6122ae6411064a1ec55cf2c2fbc08d4979f Author: dave <[email protected]> AuthorDate: Thu Oct 7 11:58:28 2021 -0400 [BAHIR-283] Fix dropped elements on InfluxDbSink --- .../connectors/influxdb/sink/writer/InfluxDBWriter.java | 11 +++++------ .../connectors/influxdb/InfluxDBSinkIntegrationTestCase.java | 6 +++++- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java index e800c86..5826c5c 100644 --- a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java +++ b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java @@ -80,16 +80,15 @@ public final class InfluxDBWriter<IN> implements SinkWriter<IN, Long, Point> { */ @Override public void write(final IN in, final Context context) throws IOException { + LOG.trace("Adding elements to buffer. Buffer size: {}", this.elements.size()); + this.elements.add(this.schemaSerializer.serialize(in, context)); if (this.elements.size() == this.bufferSize) { LOG.debug("Buffer size reached preparing to write the elements."); this.writeCurrentElements(); this.elements.clear(); - } else { - LOG.trace("Adding elements to buffer. Buffer size: {}", this.elements.size()); - this.elements.add(this.schemaSerializer.serialize(in, context)); - if (context.timestamp() != null) { - this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp()); - } + } + if (context.timestamp() != null) { + this.lastTimestamp = Math.max(this.lastTimestamp, context.timestamp()); } } diff --git a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java index 75671c8..138b017 100644 --- a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java +++ b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java @@ -31,6 +31,8 @@ import java.util.Arrays; import java.util.List; import java.util.Objects; import java.util.stream.Collectors; +import java.util.stream.Stream; + import org.apache.flink.api.common.RuntimeExecutionMode; import org.apache.flink.api.common.typeinfo.BasicTypeInfo; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; @@ -52,8 +54,9 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger { private static final List<Long> SOURCE_DATA = Arrays.asList(1L, 2L, 3L); + // FiniteTestSource emits list of elements twice private static final List<String> EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE = - SOURCE_DATA.stream() + Stream.concat(SOURCE_DATA.stream(), SOURCE_DATA.stream()) .map(x -> new InfluxDBTestSerializer().serialize(x, null).toLineProtocol()) .collect(Collectors.toList()); @@ -82,6 +85,7 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger { .setInfluxDBPassword(InfluxDBContainer.password) .setInfluxDBBucket(InfluxDBContainer.bucket) .setInfluxDBOrganization(InfluxDBContainer.organization) + .setWriteBufferSize(2) .addCheckpointDataPoint(true) .build();
