CrynetLogistics commented on a change in pull request #17687:
URL: https://github.com/apache/flink/pull/17687#discussion_r744846888
##########
File path:
flink-connectors/flink-connector-base/src/test/java/org/apache/flink/connector/base/sink/writer/AsyncSinkWriterTest.java
##########
@@ -618,9 +837,75 @@ protected long getSizeInBytes(Integer requestEntry) {
}
}
+ /** A builder for {@link AsyncSinkWriterImpl}. */
+ public class AsyncSinkWriterImplBuilder {
+
+ private Boolean simulateFailures;
+ private int delay = 0;
+ private Sink.InitContext context;
+ private Integer maxBatchSize;
+ private Integer maxInFlightRequests;
+ private Integer maxBufferedRequests;
+ private long flushOnBufferSizeInBytes = 10000000;
+ private long maxTimeInBufferMS = 1000;
+
+ private AsyncSinkWriterImplBuilder context(Sink.InitContext context) {
+ this.context = context;
+ return this;
+ }
+
+ private AsyncSinkWriterImplBuilder maxBatchSize(int maxBatchSize) {
+ this.maxBatchSize = maxBatchSize;
+ return this;
+ }
+
+ private AsyncSinkWriterImplBuilder maxInFlightRequests(int
maxInFlightRequests) {
+ this.maxInFlightRequests = maxInFlightRequests;
+ return this;
+ }
+
+ private AsyncSinkWriterImplBuilder maxBufferedRequests(int
maxBufferedRequests) {
+ this.maxBufferedRequests = maxBufferedRequests;
+ return this;
+ }
+
+ private AsyncSinkWriterImplBuilder flushOnBufferSizeInBytes(long
flushOnBufferSizeInBytes) {
+ this.flushOnBufferSizeInBytes = flushOnBufferSizeInBytes;
+ return this;
+ }
+
+ private AsyncSinkWriterImplBuilder maxTimeInBufferMS(long
maxTimeInBufferMS) {
+ this.maxTimeInBufferMS = maxTimeInBufferMS;
+ return this;
+ }
+
+ private AsyncSinkWriterImplBuilder delay(int delay) {
+ this.delay = delay;
+ return this;
+ }
+
+ private AsyncSinkWriterImplBuilder simulateFailures(boolean
simulateFailures) {
+ this.simulateFailures = simulateFailures;
+ return this;
+ }
+
+ private AsyncSinkWriterImpl build() {
+ return new AsyncSinkWriterImpl(
+ context,
+ maxBatchSize,
+ maxInFlightRequests,
+ maxBufferedRequests,
+ flushOnBufferSizeInBytes,
+ maxTimeInBufferMS,
+ simulateFailures,
+ delay);
+ }
+ }
+
private static class SinkInitContext implements Sink.InitContext {
private static final TestProcessingTimeService processingTimeService;
+ private final SinkWriterMetricGroupMock metricGroup = new
SinkWriterMetricGroupMock();
Review comment:
Thanks, it's allowed me to remove 2 classes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]