CrynetLogistics commented on a change in pull request #17345:
URL: https://github.com/apache/flink/pull/17345#discussion_r729783117



##########
File path: 
flink-connectors/flink-connector-aws/src/main/java/org/apache/flink/streaming/connectors/kinesis/async/KinesisDataStreamsSinkWriter.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Sink writer created by {@link KinesisDataStreamsSink} to write to Kinesis 
Data Streams. More
+ * details on the operation of this sink writer may be found in the doc for 
{@link
+ * KinesisDataStreamsSink}. More details on the internals of this sink writer 
may be found in {@link
+ * AsyncSinkWriter}.
+ *
+ * <p>The {@link KinesisAsyncClient} used here may be configured in the 
standard way for the AWS SDK
+ * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} 
and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+public class KinesisDataStreamsSinkWriter<InputT>
+        extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {
+
+    private static final String TOTAL_FULLY_SUCCESSFUL_FLUSHES_METRIC =
+            "totalFullySuccessfulFlushes";
+    private static final String TOTAL_PARTIALLY_SUCCESSFUL_FLUSHES_METRIC =
+            "totalPartiallySuccessfulFlushes";
+    private static final String TOTAL_FULLY_FAILED_FLUSHES_METRIC = 
"totalFullyFailedFlushes";
+    private transient Counter totalFullySuccessfulFlushesCounter;
+    private transient Counter totalPartiallySuccessfulFlushesCounter;
+    private transient Counter totalFullyFailedFlushesCounter;
+    private transient Counter numRecordsOutErrorsCounter;
+
+    private final String streamName;
+    private final SinkWriterMetricGroup metrics;
+    private static final KinesisAsyncClient client = 
KinesisAsyncClient.create();
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisDataStreamsSinkWriter.class);
+
+    KinesisDataStreamsSinkWriter(
+            ElementConverter<InputT, PutRecordsRequestEntry> elementConverter,
+            Sink.InitContext context,
+            int maxBatchSize,
+            int maxInFlightRequests,
+            int maxBufferedRequests,
+            long flushOnBufferSizeInBytes,
+            long maxTimeInBufferMS,
+            String streamName) {
+        super(
+                elementConverter,
+                context,
+                maxBatchSize,
+                maxInFlightRequests,
+                maxBufferedRequests,
+                flushOnBufferSizeInBytes,
+                maxTimeInBufferMS);
+        this.streamName = streamName;
+        this.metrics = context.metricGroup();
+        initMetricsGroup();
+    }
+
+    @Override
+    protected void submitRequestEntries(
+            List<PutRecordsRequestEntry> requestEntries,
+            Consumer<Collection<PutRecordsRequestEntry>> requestResult) {
+
+        PutRecordsRequest batchRequest =
+                
PutRecordsRequest.builder().records(requestEntries).streamName(streamName).build();
+
+        LOG.trace("Request to submit {} entries to KDS using KDS Sink.", 
requestEntries.size());
+
+        CompletableFuture<PutRecordsResponse> future = 
client.putRecords(batchRequest);
+
+        future.whenComplete(
+                (response, err) -> {
+                    if (err != null) {
+                        LOG.warn(
+                                "KDS Sink failed to persist {} entries to KDS, 
retrying whole batch",
+                                requestEntries.size());
+                        totalFullyFailedFlushesCounter.inc();
+                        numRecordsOutErrorsCounter.inc(requestEntries.size());
+
+                        requestResult.accept(requestEntries);
+                        return;
+                    }
+
+                    if (response.failedRecordCount() > 0) {
+                        LOG.warn(
+                                "KDS Sink failed to persist {} entries to KDS, 
retrying a partial batch",
+                                response.failedRecordCount());

Review comment:
       (and apologies, neglected to answer your question... it's retried by 
adding the elements to the `failedRequestEntries` and requeued in the base sink 
writer.)

##########
File path: 
flink-connectors/flink-connector-aws/src/main/java/org/apache/flink/streaming/connectors/kinesis/async/KinesisDataStreamsSinkWriter.java
##########
@@ -0,0 +1,155 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.async;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.sink.Sink;
+import org.apache.flink.connector.base.sink.writer.AsyncSinkWriter;
+import org.apache.flink.connector.base.sink.writer.ElementConverter;
+import org.apache.flink.metrics.Counter;
+import org.apache.flink.metrics.groups.SinkWriterMetricGroup;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.KinesisAsyncClient;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsRequestEntry;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.PutRecordsResultEntry;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+
+/**
+ * Sink writer created by {@link KinesisDataStreamsSink} to write to Kinesis 
Data Streams. More
+ * details on the operation of this sink writer may be found in the doc for 
{@link
+ * KinesisDataStreamsSink}. More details on the internals of this sink writer 
may be found in {@link
+ * AsyncSinkWriter}.
+ *
+ * <p>The {@link KinesisAsyncClient} used here may be configured in the 
standard way for the AWS SDK
+ * 2.x. e.g. the provision of {@code AWS_REGION}, {@code AWS_ACCESS_KEY_ID} 
and {@code
+ * AWS_SECRET_ACCESS_KEY} through environment variables etc.
+ */
+@Internal
+public class KinesisDataStreamsSinkWriter<InputT>
+        extends AsyncSinkWriter<InputT, PutRecordsRequestEntry> {
+
+    private static final String TOTAL_FULLY_SUCCESSFUL_FLUSHES_METRIC =
+            "totalFullySuccessfulFlushes";
+    private static final String TOTAL_PARTIALLY_SUCCESSFUL_FLUSHES_METRIC =
+            "totalPartiallySuccessfulFlushes";
+    private static final String TOTAL_FULLY_FAILED_FLUSHES_METRIC = 
"totalFullyFailedFlushes";
+    private transient Counter totalFullySuccessfulFlushesCounter;
+    private transient Counter totalPartiallySuccessfulFlushesCounter;
+    private transient Counter totalFullyFailedFlushesCounter;
+    private transient Counter numRecordsOutErrorsCounter;
+
+    private final String streamName;
+    private final SinkWriterMetricGroup metrics;
+    private static final KinesisAsyncClient client = 
KinesisAsyncClient.create();

Review comment:
       As agreed yesterday, we will expose some methods in the sink builder to 
allow the configuration for these.
   
   I've not encountered a situation like this before, I'm wondering if there 
are any patterns/anti-patterns as a general rule?
   
   Specifically, of the 7 fields that may be passed to the builder only 2 are 
serialisable as is. 
   
   ```
   httpClient - neither the builder nor the attributeMap is serialisable
   endpointOverride - serialisable
   region - serialisable
   applyMutation - cannot serialise
   asyncConfiguration - cannot serialise
   overrideConfiguration - some fields of configuration are serialisable, other 
fields are configuration fields with more deeply nested serialisable fields
    > Is it advisable to enumerate all of these in a builder method in the 
SinkBuilder? Seems like a lot of duplication.
   credentialsProvider
    > it's just 2 strings and some other constants, and yet is not serialisable 
- perhaps there is a way to retrospectively make Flink think it's serialisable?
   ```
   
   Thanks I appreciate your help with this gap in my knowledge.

##########
File path: 
flink-connectors/flink-connector-aws/src/test/java/org/apache/flink/streaming/connectors/kinesis/async/testutils/ExampleSource.java
##########
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.streaming.connectors.kinesis.async.testutils;
+
+import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
+
+import 
org.apache.flink.shaded.curator4.org.apache.curator.shaded.com.google.common.base.Preconditions;
+
+/**
+ * An example source function that allows the user to customize the number of 
emitted elements, the
+ * time between emissions, the total amount of time to keep alive after the 
last element and the
+ * size of each element in bytes.
+ *
+ * <p>This source is used in the testing of KDS Sinks.
+ */
+public class ExampleSource extends RichSourceFunction<String> {

Review comment:
       Ugh... there's a few decent candidates but they're private inner classes 
:'(




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to