David Quigley created BAHIR-283:
-----------------------------------
Summary: InfluxDBWriter fails to write the final element in each
element
Key: BAHIR-283
URL: https://issues.apache.org/jira/browse/BAHIR-283
Project: Bahir
Issue Type: Bug
Components: Flink Streaming Connectors
Affects Versions: Flink-1.0
Reporter: David Quigley
Fix For: Flink-Next
{{ /**
* This method calls the InfluxDB write API whenever the element list
reaches the {@link
* #bufferSize}. It keeps track of the latest timestamp of each element. It
compares the latest
* timestamp with the context.timestamp() and takes the bigger (latest)
timestamp.
*
* @param in incoming data
* @param context current Flink context
* @see org.apache.flink.api.connector.sink.SinkWriter.Context
*/
@Override
public void write(final IN in, final Context context) throws IOException {
if (this.elements.size() == this.bufferSize) {
LOG.debug("Buffer size reached preparing to write the elements.");
this.writeCurrentElements();
this.elements.clear();
} else {
LOG.trace("Adding elements to buffer. Buffer size: {}",
this.elements.size());
this.elements.add(this.schemaSerializer.serialize(in, context));
if (context.timestamp() != null) {
this.lastTimestamp = Math.max(this.lastTimestamp,
context.timestamp());
}
}
}}}
The bug is in this write method. If the number of elements in the buffer is
less than the configured buffer size, the current element is added to the
buffer. If the number of elements in the buffer is equal to the buffer size,
the buffer is flushed and the current element is not added to the next buffer.
This results in the current element being dropped.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)