tzulitai commented on a change in pull request #13886:
URL: https://github.com/apache/flink/pull/13886#discussion_r517120544
##########
File path: tools/maven/suppressions.xml
##########
@@ -43,7 +43,11 @@ under the License.
<suppress
files="FlinkKinesisProducer.java|FlinkKinesisProducerTest.java"
checks="IllegalImport"/>
- <!-- Classes copied from Hadoop -->
+ <!-- Kinesis EFO consumer required to handle Netty
ReadTimeoutException -->
+ <suppress
+
files="FanOutRecordPublisherTest.java|FanOutShardSubscriber.java|FanOutShardSubscriberTest.java"
+ checks="IllegalImport"/>
Review comment:
Is this due to the fact that these classes are using non-shaded netty
imports?
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
##########
@@ -248,6 +261,10 @@ private boolean consumeAllRecordsFromKinesisShard(
eventConsumer.accept(event);
}
} else if (subscriptionEvent.isSubscriptionComplete()) {
+ if (subscriptionErrorEvent.get() != null) {
Review comment:
Question:
What exceptions is possible to appear here, in the case that a shard has
been fully consumed / subscription expires?
To my understanding, under normal flow of a recoverable exception (e.g.
netty read timeout), we should fall into the last branch of this `if-else`
clause?
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
##########
@@ -235,8 +243,13 @@ private boolean consumeAllRecordsFromKinesisShard(
String continuationSequenceNumber;
do {
- // Read timeout will occur after 30 seconds, add a
sanity timeout here to prevent lockup
- FanOutSubscriptionEvent subscriptionEvent =
queue.poll(DEQUEUE_WAIT_SECONDS, SECONDS);
+ FanOutSubscriptionEvent subscriptionEvent;
+ if (queue.isEmpty() && subscriptionErrorEvent.get() !=
null) {
Review comment:
Question:
Can you briefly explain why we would wait until the queue is fully consumed
before handling exceptions?
##########
File path:
flink-connectors/flink-connector-kinesis/src/main/java/org/apache/flink/streaming/connectors/kinesis/internals/publisher/fanout/FanOutShardSubscriber.java
##########
@@ -272,8 +289,6 @@ private boolean consumeAllRecordsFromKinesisShard(
private final CountDownLatch waitForSubscriptionLatch;
- private final Object lockObject = new Object();
Review comment:
Making sure that I understand the new error throwing mechanism, from the
looks of the code:
- Exceptions are now passed via an atomic reference, instead of being put
into the queue
- Because of that, we no longer need to synchronize access to the queue
(because it'll only ever be accessed serially in the `onNext` and `onComplete`
methods). Therefore, this lock object is no longer needed.
- Also because of that, we can now simply block indefinitely when appending
a subscription event to the queue, because we no longer need to let error
events have a chance to take over.
What I am failing to understand is why we can now only handle errors _after_
the queue is fully consumed.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]