This is an automated email from the ASF dual-hosted git repository.
eskabetxe pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/bahir-flink.git
The following commit(s) were added to refs/heads/master by this push:
new bcd0c22 prevent each element from infludb2 sink buffer from being
dropped
bcd0c22 is described below
commit bcd0c2202fd3c89e83a84875217ceeb64a8e3489
Author: dave <[email protected]>
AuthorDate: Thu Oct 7 11:58:28 2021 -0400
prevent each element from infludb2 sink buffer from being dropped
---
.../connectors/influxdb/sink/writer/InfluxDBWriter.java | 11 +++++------
.../connectors/influxdb/InfluxDBSinkIntegrationTestCase.java | 6 +++++-
2 files changed, 10 insertions(+), 7 deletions(-)
diff --git
a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
index e800c86..5826c5c 100644
---
a/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
+++
b/flink-connector-influxdb2/src/main/java/org/apache/flink/streaming/connectors/influxdb/sink/writer/InfluxDBWriter.java
@@ -80,16 +80,15 @@ public final class InfluxDBWriter<IN> implements
SinkWriter<IN, Long, Point> {
*/
@Override
public void write(final IN in, final Context context) throws IOException {
+ LOG.trace("Adding elements to buffer. Buffer size: {}",
this.elements.size());
+ this.elements.add(this.schemaSerializer.serialize(in, context));
if (this.elements.size() == this.bufferSize) {
LOG.debug("Buffer size reached preparing to write the elements.");
this.writeCurrentElements();
this.elements.clear();
- } else {
- LOG.trace("Adding elements to buffer. Buffer size: {}",
this.elements.size());
- this.elements.add(this.schemaSerializer.serialize(in, context));
- if (context.timestamp() != null) {
- this.lastTimestamp = Math.max(this.lastTimestamp,
context.timestamp());
- }
+ }
+ if (context.timestamp() != null) {
+ this.lastTimestamp = Math.max(this.lastTimestamp,
context.timestamp());
}
}
diff --git
a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
index 75671c8..138b017 100644
---
a/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
+++
b/flink-connector-influxdb2/src/test/java/org/apache/flink/streaming/connectors/influxdb/InfluxDBSinkIntegrationTestCase.java
@@ -31,6 +31,8 @@ import java.util.Arrays;
import java.util.List;
import java.util.Objects;
import java.util.stream.Collectors;
+import java.util.stream.Stream;
+
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
@@ -52,8 +54,9 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
private static final List<Long> SOURCE_DATA = Arrays.asList(1L, 2L, 3L);
+ // FiniteTestSource emits list of elements twice
private static final List<String>
EXPECTED_COMMITTED_DATA_IN_STREAMING_MODE =
- SOURCE_DATA.stream()
+ Stream.concat(SOURCE_DATA.stream(), SOURCE_DATA.stream())
.map(x -> new InfluxDBTestSerializer().serialize(x,
null).toLineProtocol())
.collect(Collectors.toList());
@@ -82,6 +85,7 @@ class InfluxDBSinkIntegrationTestCase extends TestLogger {
.setInfluxDBPassword(InfluxDBContainer.password)
.setInfluxDBBucket(InfluxDBContainer.bucket)
.setInfluxDBOrganization(InfluxDBContainer.organization)
+ .setWriteBufferSize(2)
.addCheckpointDataPoint(true)
.build();