[
https://issues.apache.org/jira/browse/BAHIR-283?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17697472#comment-17697472
]
ASF subversion and git services commented on BAHIR-283:
-------------------------------------------------------
Commit c8b6f6122ae6411064a1ec55cf2c2fbc08d4979f in bahir-flink's branch
refs/heads/master from dave
[ https://gitbox.apache.org/repos/asf?p=bahir-flink.git;h=c8b6f61 ]
[BAHIR-283] Fix dropped elements on InfluxDbSink
> InfluxDBWriter fails to write the final element in each element
> ---------------------------------------------------------------
>
> Key: BAHIR-283
> URL: https://issues.apache.org/jira/browse/BAHIR-283
> Project: Bahir
> Issue Type: Bug
> Components: Flink Streaming Connectors
> Affects Versions: Flink-1.0
> Reporter: David Quigley
> Priority: Major
> Fix For: Flink-1.2.0
>
> Original Estimate: 2h
> Remaining Estimate: 2h
>
> {{ /**
> * This method calls the InfluxDB write API whenever the element list
> reaches the {@link
> * #bufferSize}. It keeps track of the latest timestamp of each element.
> It compares the latest
> * timestamp with the context.timestamp() and takes the bigger (latest)
> timestamp.
> *
> * @param in incoming data
> * @param context current Flink context
> * @see org.apache.flink.api.connector.sink.SinkWriter.Context
> */
> @Override
> public void write(final IN in, final Context context) throws IOException {
> if (this.elements.size() == this.bufferSize) {
> LOG.debug("Buffer size reached preparing to write the elements.");
> this.writeCurrentElements();
> this.elements.clear();
> } else {
> LOG.trace("Adding elements to buffer. Buffer size: {}",
> this.elements.size());
> this.elements.add(this.schemaSerializer.serialize(in, context));
> if (context.timestamp() != null) {
> this.lastTimestamp = Math.max(this.lastTimestamp,
> context.timestamp());
> }
> }
> }}}
> The bug is in this write method. If the number of elements in the buffer is
> less than the configured buffer size, the current element is added to the
> buffer. If the number of elements in the buffer is equal to the buffer size,
> the buffer is flushed and the current element is not added to the next
> buffer. This results in the current element being dropped.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)