mosche commented on code in PR #23540: URL: https://github.com/apache/beam/pull/23540#discussion_r1068040994
########## sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/enhancedfanout/ShardSubscriberImpl.java: ########## @@ -0,0 +1,275 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.kinesis.enhancedfanout; + +import java.util.List; +import java.util.UUID; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord; +import org.apache.beam.sdk.io.aws2.kinesis.enhancedfanout.signals.ConsumerError; +import org.apache.beam.sdk.io.aws2.kinesis.enhancedfanout.signals.RecoverableConsumerError; +import org.apache.beam.sdk.io.aws2.kinesis.enhancedfanout.signals.ShardEventWrapper; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.StartingPosition; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardRequest; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; +import software.amazon.kinesis.retrieval.AggregatorUtil; +import software.amazon.kinesis.retrieval.KinesisClientRecord; + +class ShardSubscriberImpl implements ShardSubscriber { + private static final Logger LOG = LoggerFactory.getLogger(ShardSubscriberImpl.class); + private final String shardId; + private final Config config; + private final BlockingQueue<ShardEventWrapper> shardEventsBuffer = new LinkedBlockingQueue<>(2); + private final AtomicBoolean isRunning = new AtomicBoolean(false); + private final ShardSubscribersPool pool; + private final ShardSubscribersPoolState state; + private final RecordsBuffer recordsBuffer; + private final AsyncClientProxy asyncClientProxy; + + ShardSubscriberImpl( + Config config, + String shardId, + AsyncClientProxy asyncClientProxy, + ShardSubscribersPool pool, + ShardSubscribersPoolState initialState, + RecordsBuffer recordsBuffer) { + this.config = config; + this.shardId = shardId; + this.state = initialState; + this.pool = pool; + this.recordsBuffer = recordsBuffer; + this.isRunning.set(true); + this.asyncClientProxy = asyncClientProxy; + } + + @Override + public void run() { + while (isRunning.get()) { + try { + ShardCheckpoint shardCheckpoint = state.getCheckpoint(shardId); + boolean reSubscribe = subscribe(shardCheckpoint.toStartingPosition(), this::consume); Review Comment: If I understand this right, i fear this is really problematic. We cannot assume that the start position of the shard checkpoint is advancing as fast as the subscriber is it might be lagging behind up to the buffer capacity (10k). E.g. if the downstream of the Beam Kinesis reader is very slow, we might not have acked / moved the sequence number as in the checkpoint state as far as the actual continuation sequence number that was returned by the last consumed `SubscribeToShardEvent`. The consequences of this are rather dramatic, we will start reading from a point in the shard which we've previously already consumed and consequently we are going to add duplicates that way. Which is going to increase the lag even more. I think what you absolutely want to do here is to track the latest continuationSequenceNumber received by a `SubscribeToShardEvent` independently of the checkpoint and resume from there. It might also be beneficial to decouple internal shard state from the checkpoint. It's two separate things and constantly updating the immutable shard checkpoints becomes very costly at high scale. ########## sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/enhancedfanout/KinesisEnhancedFanOutSource.java: ########## @@ -0,0 +1,103 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.kinesis.enhancedfanout; + +import static org.apache.beam.sdk.io.aws2.kinesis.enhancedfanout.Checkers.checkNotNull; +import static org.apache.beam.vendor.guava.v26_0_jre.com.google.common.collect.Lists.newArrayList; + +import java.io.IOException; +import java.util.List; +import org.apache.beam.sdk.coders.Coder; +import org.apache.beam.sdk.coders.SerializableCoder; +import org.apache.beam.sdk.io.UnboundedSource; +import org.apache.beam.sdk.io.aws2.common.ClientBuilderFactory; +import org.apache.beam.sdk.io.aws2.kinesis.KinesisIO; +import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecord; +import org.apache.beam.sdk.io.aws2.kinesis.KinesisRecordCoder; +import org.apache.beam.sdk.options.PipelineOptions; +import org.checkerframework.checker.nullness.qual.Nullable; +import software.amazon.awssdk.services.kinesis.KinesisAsyncClient; + +public class KinesisEnhancedFanOutSource + extends UnboundedSource<KinesisRecord, KinesisReaderCheckpoint> { + + private final KinesisIO.Read readSpec; + private final CheckpointGenerator checkpointGenerator; + private final ClientBuilderFactory builderFactory; + + public KinesisEnhancedFanOutSource(KinesisIO.Read readSpec, ClientBuilderFactory builderFactory) { + this(readSpec, builderFactory, new FromScratchCheckpointGenerator(Config.fromIOSpec(readSpec))); + } + + private KinesisEnhancedFanOutSource( + KinesisIO.Read readSpec, + ClientBuilderFactory builderFactory, + CheckpointGenerator initialCheckpoint) { + this.readSpec = checkNotNull(readSpec, "spec"); + this.builderFactory = builderFactory; + this.checkpointGenerator = checkNotNull(initialCheckpoint, "initialCheckpoint"); + } + + @Override + public List<KinesisEnhancedFanOutSource> split(int desiredNumSplits, PipelineOptions options) + throws Exception { + try (AsyncClientProxy kinesis = createClient()) { + KinesisReaderCheckpoint checkpoint = checkpointGenerator.generate(kinesis); + List<KinesisEnhancedFanOutSource> sources = newArrayList(); + for (KinesisReaderCheckpoint partition : checkpoint.splitInto(desiredNumSplits)) { + sources.add( + new KinesisEnhancedFanOutSource( + readSpec, builderFactory, new StaticCheckpointGenerator(partition))); + } + return sources; + } + } + + @Override + public UnboundedReader<KinesisRecord> createReader( + PipelineOptions options, @Nullable KinesisReaderCheckpoint checkpointMark) + throws IOException { + CheckpointGenerator checkpointGenerator = this.checkpointGenerator; + if (checkpointMark != null) { + checkpointGenerator = new StaticCheckpointGenerator(checkpointMark); + } + + KinesisEnhancedFanOutSource source = new KinesisEnhancedFanOutSource(readSpec, builderFactory); + return new KinesisEnhancedFanOutReader(readSpec, createClient(), checkpointGenerator, source); + } + + @Override + public Coder<KinesisReaderCheckpoint> getCheckpointMarkCoder() { + return SerializableCoder.of(KinesisReaderCheckpoint.class); + } + + @Override + public Coder<KinesisRecord> getOutputCoder() { + return KinesisRecordCoder.of(); + } + + private AsyncClientProxy createClient() { + return new AsyncClientProxyImpl( + builderFactory + .create( + KinesisAsyncClient.builder(), Review Comment: Recommended usage by AWS `KinesisClientUtil.adjustKinesisClientBuilder(KinesisAsyncClient.builder())` ########## sdks/java/io/amazon-web-services2/src/main/java/org/apache/beam/sdk/io/aws2/kinesis/enhancedfanout/KinesisShardEventsSubscriber.java: ########## @@ -0,0 +1,125 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.beam.sdk.io.aws2.kinesis.enhancedfanout; + +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; +import org.apache.beam.sdk.io.aws2.kinesis.CustomOptional; +import org.apache.beam.sdk.io.aws2.kinesis.enhancedfanout.signals.ShardEventWrapper; +import org.reactivestreams.Subscriber; +import org.reactivestreams.Subscription; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEvent; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardEventStream; +import software.amazon.awssdk.services.kinesis.model.SubscribeToShardResponseHandler; + +class KinesisShardEventsSubscriber implements Subscriber<SubscribeToShardEventStream> { + private static final Logger LOG = LoggerFactory.getLogger(KinesisShardEventsSubscriber.class); + private static final String LOG_MSG_TEMPLATE = "Stream = {} consumer = {} shard = {}"; + private static final long enqueueTimeoutMs = 35_000; + + private final BlockingQueue<ShardEventWrapper> queue; + private final CountDownLatch isRunningLatch; + private final String streamName; + private final String consumerArn; + private final String shardId; + + private CustomOptional<Subscription> s = CustomOptional.absent(); + private volatile boolean cancelled = false; + + KinesisShardEventsSubscriber( + BlockingQueue<ShardEventWrapper> queue, + CountDownLatch isRunningLatch, + String streamName, + String consumerArn, + String shardId) { + this.queue = queue; + this.isRunningLatch = isRunningLatch; + this.streamName = streamName; + this.consumerArn = consumerArn; + this.shardId = shardId; + } + + @Override + public void onSubscribe(Subscription subscription) { + s = CustomOptional.of(subscription); + isRunningLatch.countDown(); + } + + /** AWS SDK Netty thread calls this at least every ~ 5 seconds even when no new records arrive. */ + @Override + public void onNext(SubscribeToShardEventStream subscribeToShardEventStream) { + subscribeToShardEventStream.accept( + new SubscribeToShardResponseHandler.Visitor() { + @Override + public void visit(SubscribeToShardEvent event) { + enqueueEvent(ShardEventWrapper.fromNext(event)); + } + }); + } + + @Override + public void onError(Throwable throwable) { + enqueueEvent(ShardEventWrapper.error(throwable)); + cancel(); + } + + /** + * AWS SDK Netty thread calls this every ~ 5 minutes, these events alone are not enough signal to + * conclude the shard has no more records to consume. + */ + @Override + public void onComplete() { + LOG.info(LOG_MSG_TEMPLATE + " Complete", streamName, shardId, consumerArn); + enqueueEvent(ShardEventWrapper.subscriptionComplete()); + } + + void requestRecord() { + if (!cancelled) { + s.get().request(1); + } + } + + void cancel() { + if (cancelled) { + return; + } + cancelled = true; + + if (s != null) { + s.get().cancel(); + } + } + + private void enqueueEvent(ShardEventWrapper event) { + if (cancelled) { + return; + } + + try { + if (!queue.offer(event, enqueueTimeoutMs, TimeUnit.MILLISECONDS)) { Review Comment: This must be a while loop, otherwise we might lose events. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
