fapaul commented on a change in pull request #17687:
URL: https://github.com/apache/flink/pull/17687#discussion_r744759976
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriter.java
##########
@@ -219,25 +278,38 @@ private void flush() {
List<RequestEntryT> batch = new ArrayList<>(maxBatchSize);
int batchSize = Math.min(maxBatchSize, bufferedRequestEntries.size());
+ int batchSizeBytes = 0;
for (int i = 0; i < batchSize; i++) {
RequestEntryWrapper<RequestEntryT> elem =
bufferedRequestEntries.remove();
batch.add(elem.getRequestEntry());
bufferedRequestEntriesTotalSizeInBytes -= elem.getSize();
+ batchSizeBytes += elem.getSize();
}
if (batch.size() == 0) {
return;
}
+ long timestampOfRequest = System.currentTimeMillis();
Consumer<Collection<RequestEntryT>> requestResult =
failedRequestEntries ->
mailboxExecutor.execute(
- () -> completeRequest(failedRequestEntries),
+ () -> completeRequest(failedRequestEntries,
timestampOfRequest),
"Mark in-flight request as completed and
requeue %d request entries",
failedRequestEntries.size());
+ Consumer<Exception> fatalExceptionCons =
Review comment:
Can we make this an instance variable and only instantiate it once?
AFAICT it does not hold a reference to a variable of this method?
##########
File path:
flink-connectors/flink-connector-base/src/main/java/org/apache/flink/connector/base/sink/AsyncSinkBaseBuilder.java
##########
@@ -0,0 +1,139 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.base.sink;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+
+import java.io.Serializable;
+
+/**
+ * Abstract builder for constructing a concrete implementation of {@link
AsyncSinkBase}.
+ *
+ * @param <InputT> type of elements that should be persisted in the destination
+ * @param <RequestEntryT> type of payload that contains the element and
additional metadata that is
+ * required to submit a single element to the destination
+ * @param <ConcreteBuilderT> type of concrete implementation of this builder
class
+ */
+@PublicEvolving
+public abstract class AsyncSinkBaseBuilder<
+ InputT,
+ RequestEntryT extends Serializable,
+ ConcreteBuilderT extends AsyncSinkBaseBuilder<?, ?, ?>> {
+
+ private ElementConverter<InputT, RequestEntryT> elementConverter;
+ private Integer maxBatchSize;
+ private Integer maxInFlightRequests;
+ private Integer maxBufferedRequests;
+ private Long flushOnBufferSizeInBytes;
+ private Long maxTimeInBufferMS;
+
+ /**
+ * @param elementConverter the {@link ElementConverter} to be used for the
sink
+ * @return {@link ConcreteBuilderT} itself
+ */
+ public ConcreteBuilderT setElementConverter(
+ ElementConverter<InputT, RequestEntryT> elementConverter) {
+ this.elementConverter = elementConverter;
+ return (ConcreteBuilderT) this;
+ }
+
+ /**
+ * @param maxBatchSize maximum number of elements that may be passed in a
list to be written
+ * downstream.
+ * @return {@link ConcreteBuilderT} itself
+ */
+ public ConcreteBuilderT setMaxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ return (ConcreteBuilderT) this;
+ }
+
+ /**
+ * @param maxInFlightRequests maximum number of uncompleted calls to
submitRequestEntries that
+ * the SinkWriter will allow at any given point. Once this point has
reached, writes and
+ * callbacks to add elements to the buffer may block until one or more
requests to
+ * submitRequestEntries completes.
+ * @return {@link ConcreteBuilderT} itself
+ */
+ public ConcreteBuilderT setMaxInFlightRequests(int maxInFlightRequests) {
+ this.maxInFlightRequests = maxInFlightRequests;
+ return (ConcreteBuilderT) this;
+ }
+
+ /**
+ * @param maxBufferedRequests the maximum buffer length. Callbacks to add
elements to the buffer
+ * and calls to write will block if this length has been reached and
will only unblock if
+ * elements from the buffer have been removed for flushing.
+ * @return {@link ConcreteBuilderT} itself
+ */
+ public ConcreteBuilderT setMaxBufferedRequests(int maxBufferedRequests) {
+ this.maxBufferedRequests = maxBufferedRequests;
+ return (ConcreteBuilderT) this;
+ }
+
+ /**
+ * @param flushOnBufferSizeInBytes a flush will be attempted if the most
recent call to write
+ * introduces an element to the buffer such that the total size of the
buffer is greater
+ * than or equal to this threshold value.
+ * @return {@link ConcreteBuilderT} itself
+ */
+ public ConcreteBuilderT setFlushOnBufferSizeInBytes(long
flushOnBufferSizeInBytes) {
+ this.flushOnBufferSizeInBytes = flushOnBufferSizeInBytes;
+ return (ConcreteBuilderT) this;
+ }
+
+ /**
+ * @param maxTimeInBufferMS the maximum amount of time an element may
remain in the buffer. In
+ * most cases elements are flushed as a result of the batch size (in
bytes or number) being
+ * reached or during a snapshot. However, there are scenarios where an
element may remain in
+ * the buffer forever or a long period of time. To mitigate this, a
timer is constantly
+ * active in the buffer such that: while the buffer is not empty, it
will flush every
+ * maxTimeInBufferMS milliseconds.
+ * @return {@link ConcreteBuilderT} itself
+ */
+ public ConcreteBuilderT setMaxTimeInBufferMS(long maxTimeInBufferMS) {
+ this.maxTimeInBufferMS = maxTimeInBufferMS;
+ return (ConcreteBuilderT) this;
+ }
+
+ protected abstract AsyncSinkBase<InputT, RequestEntryT> build();
Review comment:
Shouldn't this method be public? + a basic doc string
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -618,9 +837,75 @@ protected long getSizeInBytes(Integer requestEntry) {
}
}
+ /** A builder for {@link AsyncSinkWriterImpl}. */
+ public class AsyncSinkWriterImplBuilder {
+
+ private Boolean simulateFailures;
+ private int delay = 0;
+ private Sink.InitContext context;
+ private Integer maxBatchSize;
+ private Integer maxInFlightRequests;
+ private Integer maxBufferedRequests;
+ private long flushOnBufferSizeInBytes = 10000000;
+ private long maxTimeInBufferMS = 1000;
+
+ private AsyncSinkWriterImplBuilder context(Sink.InitContext context) {
+ this.context = context;
+ return this;
+ }
+
+ private AsyncSinkWriterImplBuilder maxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ return this;
+ }
+
+ private AsyncSinkWriterImplBuilder maxInFlightRequests(int
maxInFlightRequests) {
+ this.maxInFlightRequests = maxInFlightRequests;
+ return this;
+ }
+
+ private AsyncSinkWriterImplBuilder maxBufferedRequests(int
maxBufferedRequests) {
+ this.maxBufferedRequests = maxBufferedRequests;
+ return this;
+ }
+
+ private AsyncSinkWriterImplBuilder flushOnBufferSizeInBytes(long
flushOnBufferSizeInBytes) {
+ this.flushOnBufferSizeInBytes = flushOnBufferSizeInBytes;
+ return this;
+ }
+
+ private AsyncSinkWriterImplBuilder maxTimeInBufferMS(long
maxTimeInBufferMS) {
+ this.maxTimeInBufferMS = maxTimeInBufferMS;
+ return this;
+ }
+
+ private AsyncSinkWriterImplBuilder delay(int delay) {
+ this.delay = delay;
+ return this;
+ }
+
+ private AsyncSinkWriterImplBuilder simulateFailures(boolean
simulateFailures) {
+ this.simulateFailures = simulateFailures;
+ return this;
+ }
+
+ private AsyncSinkWriterImpl build() {
+ return new AsyncSinkWriterImpl(
+ context,
+ maxBatchSize,
+ maxInFlightRequests,
+ maxBufferedRequests,
+ flushOnBufferSizeInBytes,
+ maxTimeInBufferMS,
+ simulateFailures,
+ delay);
+ }
+ }
+
private static class SinkInitContext implements Sink.InitContext {
private static final TestProcessingTimeService processingTimeService;
+ private final SinkWriterMetricGroupMock metricGroup = new
SinkWriterMetricGroupMock();
Review comment:
We have a `MetricListener` in Flink to ease testing of emitted metrics
you take a look at
https://github.com/apache/flink/blob/dabc1a894fd5bca1882a071b5d4a015f2d341a32/flink-connectors/flink-connector-kafka/src/test/java/org/apache/flink/connector/kafka/sink/KafkaWriterITCase.java#L153
how to use it.
I think by using it you can get rid of all the metric related mock classes.
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -618,9 +837,75 @@ protected long getSizeInBytes(Integer requestEntry) {
}
}
+ /** A builder for {@link AsyncSinkWriterImpl}. */
+ public class AsyncSinkWriterImplBuilder {
Review comment:
private?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]