dannycranmer commented on code in PR #49:
URL: 
https://github.com/apache/flink-connector-aws/pull/49#discussion_r1175101246


##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/config/SourceConfigConstants.java:
##########
@@ -0,0 +1,369 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.config;
+
+import org.apache.flink.annotation.PublicEvolving;
+import org.apache.flink.connector.aws.config.AWSConfigConstants;
+import 
org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader;
+
+import java.time.Duration;
+
+@PublicEvolving
+public class SourceConfigConstants extends AWSConfigConstants {

Review Comment:
   Why do you need to extend `AWSConfigConstants` here? We did this before to 
retain backwards compatibility, but since this is a new Source we might not 
need to do this



##########
flink-connector-aws-kinesis-streams/pom.xml:
##########
@@ -80,6 +80,12 @@ under the License.
             <type>test-jar</type>
             <scope>test</scope>
         </dependency>

Review Comment:
   Can we rename this module, it says Sink v2 currently. See the `<name/>` above



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;
+import 
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.model.CompletedShardsEvent;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigUtil.parseStreamTimestampStartingPosition;
+
+/**
+ * This class is used to discover and assign Kinesis splits to subtasks on the 
Flink cluster. This
+ * runs on the JobManager.
+ */
+@Internal
+public class KinesisStreamsSourceEnumerator
+        implements SplitEnumerator<KinesisShardSplit, 
KinesisStreamsSourceEnumeratorState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<KinesisShardSplit> context;
+    private final String streamArn;
+    private final Properties consumerConfig;
+    private final StreamProxy streamProxy;
+    private final KinesisShardAssigner shardAssigner;
+    private final KinesisShardAssigner.Context shardAssignerContext;
+
+    private final Map<Integer, Set<KinesisShardSplit>> splitAssignment = new 
HashMap<>();
+    private final Set<String> assignedSplitIds = new HashSet<>();
+    private final Set<KinesisShardSplit> unassignedSplits;
+    private final Set<String> completedSplitIds;
+
+    private String lastSeenShardId;
+
+    public KinesisStreamsSourceEnumerator(
+            SplitEnumeratorContext<KinesisShardSplit> context,
+            String streamArn,
+            Properties consumerConfig,
+            StreamProxy streamProxy,
+            KinesisStreamsSourceEnumeratorState state) {
+        this.context = context;
+        this.streamArn = streamArn;
+        this.consumerConfig = consumerConfig;
+        this.streamProxy = streamProxy;
+        this.shardAssigner = ShardAssignerFactory.uniformShardAssigner();
+        this.shardAssignerContext = new ShardAssignerContext(splitAssignment, 
context);
+        if (state == null) {
+            this.completedSplitIds = new HashSet<>();
+            this.lastSeenShardId = null;
+            this.unassignedSplits = new HashSet<>();
+        } else {
+            this.completedSplitIds = state.getCompletedSplitIds();
+            this.lastSeenShardId = state.getLastSeenShardId();
+            this.unassignedSplits = state.getUnassignedSplits();
+        }
+    }
+
+    @Override
+    public void start() {
+        if (lastSeenShardId == null) {
+            context.callAsync(this::initialDiscoverSplits, this::assignSplits);
+        }
+
+        final long shardDiscoveryInterval =
+                Long.parseLong(
+                        consumerConfig.getProperty(
+                                SHARD_DISCOVERY_INTERVAL_MILLIS,
+                                
String.valueOf(DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
+        context.callAsync(
+                this::periodicallyDiscoverSplits,

Review Comment:
   FYI, This will go away once we implement parent/child shard ordering. 



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer;
+import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy;
+import 
org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter;
+import 
org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader;
+import 
org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import 
org.apache.flink.connector.kinesis.source.split.KinesisShardSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.utils.AttributeMap;
+
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * The {@link KinesisStreamsSource} is an exactly-once parallel streaming data 
source that
+ * subscribes to a single AWS Kinesis data stream. It is able to handle 
resharding of streams, and
+ * stores its current progress in Flink checkpoints. The source will read in 
data from the Kinesis
+ * Data stream, deserialize it using the provided {@link 
DeserializationSchema}, and emit the record
+ * into the Flink job graph.
+ *
+ * <p>Exactly-once semantics. To leverage Flink's checkpointing mechanics for 
exactly-once stream
+ * processing, the Kinesis Source is implemented with the AWS Java SDK, 
instead of the officially
+ * recommended AWS Kinesis Client Library. The source will store its current 
progress in Flink
+ * checkpoint/savepoint, and will pick up from where it left off upon restore 
from the
+ * checkpoint/savepoint.
+ *
+ * <p>Initial starting points. The Kinesis Streams Source supports reads 
starting from TRIM_HORIZON,
+ * LATEST, and AT_TIMESTAMP.
+ *
+ * @param <T> the data type emitted by the source
+ */
+@Experimental
+public class KinesisStreamsSource<T>
+        implements Source<T, KinesisShardSplit, 
KinesisStreamsSourceEnumeratorState> {
+
+    private final String streamArn;
+    private final Properties consumerConfig;
+    private final DeserializationSchema<T> deserializationSchema;
+
+    public KinesisStreamsSource(
+            String streamArn,
+            Properties consumerConfig,
+            DeserializationSchema<T> deserializationSchema) {
+        this.streamArn = streamArn;
+        this.consumerConfig = consumerConfig;
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<T, KinesisShardSplit> createReader(SourceReaderContext 
readerContext)
+            throws Exception {
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Record>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+        KinesisStreamProxy kinesisStreamProxy = 
createKinesisStreamProxy(consumerConfig);
+        Supplier<PollingKinesisShardSplitReader> splitReaderSupplier =
+                () -> new PollingKinesisShardSplitReader(kinesisStreamProxy);
+        KinesisStreamsRecordEmitter<T> recordEmitter =
+                new KinesisStreamsRecordEmitter<>(deserializationSchema);
+
+        return new KinesisStreamsSourceReader<>(
+                elementsQueue,
+                new SingleThreadFetcherManager<>(elementsQueue, 
splitReaderSupplier::get),
+                recordEmitter,
+                convertPropertiesToConfiguration(consumerConfig),
+                readerContext);
+    }
+
+    @Override
+    public SplitEnumerator<KinesisShardSplit, 
KinesisStreamsSourceEnumeratorState> createEnumerator(
+            SplitEnumeratorContext<KinesisShardSplit> enumContext) throws 
Exception {
+        return restoreEnumerator(enumContext, null);
+    }
+
+    @Override
+    public SplitEnumerator<KinesisShardSplit, 
KinesisStreamsSourceEnumeratorState>
+            restoreEnumerator(
+                    SplitEnumeratorContext<KinesisShardSplit> enumContext,
+                    KinesisStreamsSourceEnumeratorState checkpoint)
+                    throws Exception {
+        return new KinesisStreamsSourceEnumerator(
+                enumContext,
+                streamArn,
+                consumerConfig,
+                createKinesisStreamProxy(consumerConfig),
+                checkpoint);
+    }
+
+    @Override
+    public SimpleVersionedSerializer<KinesisShardSplit> getSplitSerializer() {
+        return new KinesisShardSplitSerializer();
+    }
+
+    @Override
+    public SimpleVersionedSerializer<KinesisStreamsSourceEnumeratorState>
+            getEnumeratorCheckpointSerializer() {
+        return new KinesisStreamsSourceEnumeratorStateSerializer(new 
KinesisShardSplitSerializer());
+    }
+
+    private Configuration convertPropertiesToConfiguration(Properties props) {
+        Configuration config = new Configuration();
+        props.stringPropertyNames().forEach(key -> config.setString(key, 
props.getProperty(key)));
+        return config;
+    }

Review Comment:
   This seems fairly general, is there nothing you can reuse, or can you move 
it somewhere more general?



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer;
+import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy;
+import 
org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter;
+import 
org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader;
+import 
org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import 
org.apache.flink.connector.kinesis.source.split.KinesisShardSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.utils.AttributeMap;
+
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * The {@link KinesisStreamsSource} is an exactly-once parallel streaming data 
source that
+ * subscribes to a single AWS Kinesis data stream. It is able to handle 
resharding of streams, and
+ * stores its current progress in Flink checkpoints. The source will read in 
data from the Kinesis
+ * Data stream, deserialize it using the provided {@link 
DeserializationSchema}, and emit the record
+ * into the Flink job graph.
+ *
+ * <p>Exactly-once semantics. To leverage Flink's checkpointing mechanics for 
exactly-once stream
+ * processing, the Kinesis Source is implemented with the AWS Java SDK, 
instead of the officially
+ * recommended AWS Kinesis Client Library. The source will store its current 
progress in Flink
+ * checkpoint/savepoint, and will pick up from where it left off upon restore 
from the
+ * checkpoint/savepoint.
+ *
+ * <p>Initial starting points. The Kinesis Streams Source supports reads 
starting from TRIM_HORIZON,
+ * LATEST, and AT_TIMESTAMP.
+ *
+ * @param <T> the data type emitted by the source
+ */
+@Experimental
+public class KinesisStreamsSource<T>
+        implements Source<T, KinesisShardSplit, 
KinesisStreamsSourceEnumeratorState> {
+
+    private final String streamArn;
+    private final Properties consumerConfig;
+    private final DeserializationSchema<T> deserializationSchema;
+
+    public KinesisStreamsSource(
+            String streamArn,
+            Properties consumerConfig,

Review Comment:
   Why not just use `Configuration` here so we do not need to 
`convertPropertiesToConfiguration`?



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/ShardAssignerFactory.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Experimental;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
+
+/**
+ * Factory that provides an instance of {@link KinesisShardAssigner} 
pre-packaged with the
+ * connector.
+ */
+@Experimental
+public class ShardAssignerFactory {

Review Comment:
   Consider deleting this along with the Hash Map assigner



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;
+import 
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.model.CompletedShardsEvent;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigUtil.parseStreamTimestampStartingPosition;
+
+/**
+ * This class is used to discover and assign Kinesis splits to subtasks on the 
Flink cluster. This
+ * runs on the JobManager.
+ */
+@Internal
+public class KinesisStreamsSourceEnumerator
+        implements SplitEnumerator<KinesisShardSplit, 
KinesisStreamsSourceEnumeratorState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<KinesisShardSplit> context;
+    private final String streamArn;
+    private final Properties consumerConfig;
+    private final StreamProxy streamProxy;
+    private final KinesisShardAssigner shardAssigner;
+    private final KinesisShardAssigner.Context shardAssignerContext;
+
+    private final Map<Integer, Set<KinesisShardSplit>> splitAssignment = new 
HashMap<>();
+    private final Set<String> assignedSplitIds = new HashSet<>();
+    private final Set<KinesisShardSplit> unassignedSplits;
+    private final Set<String> completedSplitIds;

Review Comment:
   This will grow unbounded. While we are still periodically discovering shards 
it might be worth removing shards that have expired (due to retention) to avoid 
*memory leak*



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;
+import 
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.model.CompletedShardsEvent;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigUtil.parseStreamTimestampStartingPosition;
+
+/**
+ * This class is used to discover and assign Kinesis splits to subtasks on the 
Flink cluster. This
+ * runs on the JobManager.
+ */
+@Internal
+public class KinesisStreamsSourceEnumerator
+        implements SplitEnumerator<KinesisShardSplit, 
KinesisStreamsSourceEnumeratorState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<KinesisShardSplit> context;
+    private final String streamArn;
+    private final Properties consumerConfig;
+    private final StreamProxy streamProxy;
+    private final KinesisShardAssigner shardAssigner;
+    private final KinesisShardAssigner.Context shardAssignerContext;
+
+    private final Map<Integer, Set<KinesisShardSplit>> splitAssignment = new 
HashMap<>();
+    private final Set<String> assignedSplitIds = new HashSet<>();
+    private final Set<KinesisShardSplit> unassignedSplits;
+    private final Set<String> completedSplitIds;
+
+    private String lastSeenShardId;
+
+    public KinesisStreamsSourceEnumerator(
+            SplitEnumeratorContext<KinesisShardSplit> context,
+            String streamArn,
+            Properties consumerConfig,
+            StreamProxy streamProxy,
+            KinesisStreamsSourceEnumeratorState state) {
+        this.context = context;
+        this.streamArn = streamArn;
+        this.consumerConfig = consumerConfig;
+        this.streamProxy = streamProxy;
+        this.shardAssigner = ShardAssignerFactory.uniformShardAssigner();
+        this.shardAssignerContext = new ShardAssignerContext(splitAssignment, 
context);
+        if (state == null) {
+            this.completedSplitIds = new HashSet<>();
+            this.lastSeenShardId = null;
+            this.unassignedSplits = new HashSet<>();
+        } else {
+            this.completedSplitIds = state.getCompletedSplitIds();
+            this.lastSeenShardId = state.getLastSeenShardId();
+            this.unassignedSplits = state.getUnassignedSplits();
+        }
+    }
+
+    @Override
+    public void start() {
+        if (lastSeenShardId == null) {
+            context.callAsync(this::initialDiscoverSplits, this::assignSplits);
+        }
+
+        final long shardDiscoveryInterval =
+                Long.parseLong(
+                        consumerConfig.getProperty(
+                                SHARD_DISCOVERY_INTERVAL_MILLIS,
+                                
String.valueOf(DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
+        context.callAsync(
+                this::periodicallyDiscoverSplits,
+                this::assignSplits,
+                shardDiscoveryInterval,
+                shardDiscoveryInterval);
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+        // Do nothing, since we assign splits eagerly
+    }
+
+    @Override
+    public void addSplitsBack(List<KinesisShardSplit> splits, int subtaskId) {
+        if (!splitAssignment.containsKey(subtaskId)) {
+            LOG.warn(
+                    "Unable to add splits back for subtask {} since it is not 
assigned any splits. Splits: {}",
+                    subtaskId,
+                    splits);
+            return;
+        }
+
+        for (KinesisShardSplit split : splits) {
+            splitAssignment.get(subtaskId).remove(split);
+            assignedSplitIds.remove(split.splitId());
+            unassignedSplits.add(split);
+        }
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        splitAssignment.putIfAbsent(subtaskId, new HashSet<>());
+    }
+
+    @Override
+    public KinesisStreamsSourceEnumeratorState snapshotState(long 
checkpointId) throws Exception {
+        return new KinesisStreamsSourceEnumeratorState(
+                completedSplitIds, unassignedSplits, lastSeenShardId);
+    }
+
+    @Override
+    public void close() throws IOException {}
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof CompletedShardsEvent) {
+            Set<String> newlyCompletedSplitIds =
+                    ((CompletedShardsEvent) 
sourceEvent).getCompletedSplitIds();
+            LOG.info(
+                    "Received CompletedShardsEvent from subtask {}. Marking 
the following splits as complete: {}",
+                    subtaskId,
+                    newlyCompletedSplitIds);
+            completedSplitIds.addAll(newlyCompletedSplitIds);
+        } else {
+            SplitEnumerator.super.handleSourceEvent(subtaskId, sourceEvent);
+        }
+    }
+
+    private List<KinesisShardSplit> initialDiscoverSplits() {
+        List<Shard> shards = streamProxy.listShards(streamArn, 
lastSeenShardId);
+        return mapToSplits(shards, false);
+    }
+
+    /**
+     * This method is used to discover Kinesis splits the job can subscribe 
to. It can be run in
+     * parallel, is important to not mutate any shared state.
+     *
+     * @return list of discovered splits
+     */
+    private List<KinesisShardSplit> periodicallyDiscoverSplits() {
+        List<Shard> shards = streamProxy.listShards(streamArn, 
lastSeenShardId);
+        // Any shard discovered after the initial startup should be read from 
the start, since they
+        // come from resharding
+        return mapToSplits(shards, true);
+    }
+
+    private List<KinesisShardSplit> mapToSplits(List<Shard> shards, boolean 
shouldReadFromStart) {
+        InitialPosition initialPositionFromConfig =
+                shouldReadFromStart
+                        ? InitialPosition.TRIM_HORIZON
+                        : InitialPosition.valueOf(
+                                consumerConfig
+                                        .getOrDefault(
+                                                STREAM_INITIAL_POSITION,
+                                                
DEFAULT_STREAM_INITIAL_POSITION)
+                                        .toString());
+        StartingPosition startingPosition;
+        switch (initialPositionFromConfig) {
+            case LATEST:
+                // If LATEST is requested, we still set the starting position 
to the time of
+                // startup. This way, the job starts reading from a 
deterministic timestamp
+                // (i.e. time of job submission), even if it enters a restart 
loop immediately
+                // after submission.
+                startingPosition = 
StartingPosition.fromTimestamp(Instant.now());
+                break;
+            case AT_TIMESTAMP:
+                startingPosition =
+                        StartingPosition.fromTimestamp(
+                                
parseStreamTimestampStartingPosition(consumerConfig).toInstant());
+                break;
+            case TRIM_HORIZON:
+            default:
+                startingPosition = StartingPosition.fromStart();
+        }
+
+        List<KinesisShardSplit> splits = new ArrayList<>();
+        for (Shard shard : shards) {
+            splits.add(new KinesisShardSplit(streamArn, shard.shardId(), 
startingPosition));
+        }
+
+        return splits;
+    }
+
+    /**
+     * This method assigns a given set of Kinesis splits to the readers 
currently registered on the
+     * cluster. This assignment is done via a side-effect on the {@link 
SplitEnumeratorContext}
+     * object.
+     *
+     * @param discoveredSplits list of discovered splits
+     * @param t throwable thrown when discovering splits. Will be null if no 
throwable thrown.
+     */
+    private void assignSplits(List<KinesisShardSplit> discoveredSplits, 
Throwable t) {
+        if (t != null) {
+            throw new KinesisStreamsSourceException("Failed to list shards.", 
t);
+        }
+
+        if (context.registeredReaders().isEmpty()) {

Review Comment:
   When would this happen? If this expected? Should we thrown an exception 
instead?



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/model/CompletedShardsEvent.java:
##########
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.model;
+
+import org.apache.flink.api.connector.source.SourceEvent;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator;
+import 
org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader;
+
+import java.util.Set;
+
+/**
+ * A source event sent from the {@link KinesisStreamsSourceReader} to the 
{@link
+ * KinesisStreamsSourceEnumerator} to indicate that the current shard is 
completed.
+ */
+public class CompletedShardsEvent implements SourceEvent {

Review Comment:
   Missing compatibility annotation



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;
+import 
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.model.CompletedShardsEvent;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigUtil.parseStreamTimestampStartingPosition;
+
+/**
+ * This class is used to discover and assign Kinesis splits to subtasks on the 
Flink cluster. This
+ * runs on the JobManager.
+ */
+@Internal
+public class KinesisStreamsSourceEnumerator
+        implements SplitEnumerator<KinesisShardSplit, 
KinesisStreamsSourceEnumeratorState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<KinesisShardSplit> context;
+    private final String streamArn;
+    private final Properties consumerConfig;
+    private final StreamProxy streamProxy;
+    private final KinesisShardAssigner shardAssigner;
+    private final KinesisShardAssigner.Context shardAssignerContext;
+
+    private final Map<Integer, Set<KinesisShardSplit>> splitAssignment = new 
HashMap<>();
+    private final Set<String> assignedSplitIds = new HashSet<>();
+    private final Set<KinesisShardSplit> unassignedSplits;
+    private final Set<String> completedSplitIds;
+
+    private String lastSeenShardId;
+
+    public KinesisStreamsSourceEnumerator(
+            SplitEnumeratorContext<KinesisShardSplit> context,
+            String streamArn,
+            Properties consumerConfig,
+            StreamProxy streamProxy,
+            KinesisStreamsSourceEnumeratorState state) {
+        this.context = context;
+        this.streamArn = streamArn;
+        this.consumerConfig = consumerConfig;
+        this.streamProxy = streamProxy;
+        this.shardAssigner = ShardAssignerFactory.uniformShardAssigner();
+        this.shardAssignerContext = new ShardAssignerContext(splitAssignment, 
context);
+        if (state == null) {
+            this.completedSplitIds = new HashSet<>();
+            this.lastSeenShardId = null;
+            this.unassignedSplits = new HashSet<>();
+        } else {
+            this.completedSplitIds = state.getCompletedSplitIds();
+            this.lastSeenShardId = state.getLastSeenShardId();
+            this.unassignedSplits = state.getUnassignedSplits();
+        }
+    }
+
+    @Override
+    public void start() {
+        if (lastSeenShardId == null) {
+            context.callAsync(this::initialDiscoverSplits, this::assignSplits);
+        }
+
+        final long shardDiscoveryInterval =
+                Long.parseLong(
+                        consumerConfig.getProperty(
+                                SHARD_DISCOVERY_INTERVAL_MILLIS,
+                                
String.valueOf(DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
+        context.callAsync(
+                this::periodicallyDiscoverSplits,
+                this::assignSplits,
+                shardDiscoveryInterval,
+                shardDiscoveryInterval);
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+        // Do nothing, since we assign splits eagerly
+    }
+
+    @Override
+    public void addSplitsBack(List<KinesisShardSplit> splits, int subtaskId) {
+        if (!splitAssignment.containsKey(subtaskId)) {
+            LOG.warn(
+                    "Unable to add splits back for subtask {} since it is not 
assigned any splits. Splits: {}",
+                    subtaskId,
+                    splits);
+            return;
+        }
+
+        for (KinesisShardSplit split : splits) {
+            splitAssignment.get(subtaskId).remove(split);
+            assignedSplitIds.remove(split.splitId());
+            unassignedSplits.add(split);
+        }
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        splitAssignment.putIfAbsent(subtaskId, new HashSet<>());
+    }
+
+    @Override
+    public KinesisStreamsSourceEnumeratorState snapshotState(long 
checkpointId) throws Exception {
+        return new KinesisStreamsSourceEnumeratorState(
+                completedSplitIds, unassignedSplits, lastSeenShardId);
+    }
+
+    @Override
+    public void close() throws IOException {}
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof CompletedShardsEvent) {
+            Set<String> newlyCompletedSplitIds =
+                    ((CompletedShardsEvent) 
sourceEvent).getCompletedSplitIds();
+            LOG.info(
+                    "Received CompletedShardsEvent from subtask {}. Marking 
the following splits as complete: {}",
+                    subtaskId,
+                    newlyCompletedSplitIds);
+            completedSplitIds.addAll(newlyCompletedSplitIds);
+        } else {
+            SplitEnumerator.super.handleSourceEvent(subtaskId, sourceEvent);
+        }
+    }
+
+    private List<KinesisShardSplit> initialDiscoverSplits() {
+        List<Shard> shards = streamProxy.listShards(streamArn, 
lastSeenShardId);
+        return mapToSplits(shards, false);
+    }
+
+    /**
+     * This method is used to discover Kinesis splits the job can subscribe 
to. It can be run in
+     * parallel, is important to not mutate any shared state.
+     *
+     * @return list of discovered splits
+     */
+    private List<KinesisShardSplit> periodicallyDiscoverSplits() {
+        List<Shard> shards = streamProxy.listShards(streamArn, 
lastSeenShardId);
+        // Any shard discovered after the initial startup should be read from 
the start, since they
+        // come from resharding
+        return mapToSplits(shards, true);
+    }
+
+    private List<KinesisShardSplit> mapToSplits(List<Shard> shards, boolean 
shouldReadFromStart) {

Review Comment:
   nit: This would be cleaner to pass in the `InitialPosition` rather than 
`shouldReadFromStart`



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/assigner/HashShardAssigner.java:
##########
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator.assigner;
+
+import org.apache.flink.annotation.Internal;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisShardAssigner;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.util.Preconditions;
+
+/** An implementation of the {@link KinesisShardAssigner} that assigns splits 
by hashcode. */
+@Internal
+public class HashShardAssigner implements KinesisShardAssigner {

Review Comment:
   I would be inclined to just delete this shard assigner unless you can 
convince me of a usecase where it is better than the uniform assigner. What 
will our docs say?



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;
+import 
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.model.CompletedShardsEvent;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigUtil.parseStreamTimestampStartingPosition;
+
+/**
+ * This class is used to discover and assign Kinesis splits to subtasks on the 
Flink cluster. This
+ * runs on the JobManager.
+ */
+@Internal
+public class KinesisStreamsSourceEnumerator
+        implements SplitEnumerator<KinesisShardSplit, 
KinesisStreamsSourceEnumeratorState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<KinesisShardSplit> context;
+    private final String streamArn;
+    private final Properties consumerConfig;
+    private final StreamProxy streamProxy;
+    private final KinesisShardAssigner shardAssigner;
+    private final KinesisShardAssigner.Context shardAssignerContext;
+
+    private final Map<Integer, Set<KinesisShardSplit>> splitAssignment = new 
HashMap<>();
+    private final Set<String> assignedSplitIds = new HashSet<>();
+    private final Set<KinesisShardSplit> unassignedSplits;
+    private final Set<String> completedSplitIds;
+
+    private String lastSeenShardId;
+
+    public KinesisStreamsSourceEnumerator(
+            SplitEnumeratorContext<KinesisShardSplit> context,
+            String streamArn,
+            Properties consumerConfig,
+            StreamProxy streamProxy,
+            KinesisStreamsSourceEnumeratorState state) {
+        this.context = context;
+        this.streamArn = streamArn;
+        this.consumerConfig = consumerConfig;
+        this.streamProxy = streamProxy;
+        this.shardAssigner = ShardAssignerFactory.uniformShardAssigner();
+        this.shardAssignerContext = new ShardAssignerContext(splitAssignment, 
context);
+        if (state == null) {
+            this.completedSplitIds = new HashSet<>();
+            this.lastSeenShardId = null;
+            this.unassignedSplits = new HashSet<>();
+        } else {
+            this.completedSplitIds = state.getCompletedSplitIds();
+            this.lastSeenShardId = state.getLastSeenShardId();
+            this.unassignedSplits = state.getUnassignedSplits();
+        }
+    }
+
+    @Override
+    public void start() {
+        if (lastSeenShardId == null) {
+            context.callAsync(this::initialDiscoverSplits, this::assignSplits);
+        }
+
+        final long shardDiscoveryInterval =
+                Long.parseLong(
+                        consumerConfig.getProperty(
+                                SHARD_DISCOVERY_INTERVAL_MILLIS,
+                                
String.valueOf(DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
+        context.callAsync(
+                this::periodicallyDiscoverSplits,
+                this::assignSplits,
+                shardDiscoveryInterval,
+                shardDiscoveryInterval);
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+        // Do nothing, since we assign splits eagerly
+    }
+
+    @Override
+    public void addSplitsBack(List<KinesisShardSplit> splits, int subtaskId) {
+        if (!splitAssignment.containsKey(subtaskId)) {
+            LOG.warn(
+                    "Unable to add splits back for subtask {} since it is not 
assigned any splits. Splits: {}",
+                    subtaskId,
+                    splits);
+            return;
+        }
+
+        for (KinesisShardSplit split : splits) {
+            splitAssignment.get(subtaskId).remove(split);
+            assignedSplitIds.remove(split.splitId());
+            unassignedSplits.add(split);
+        }
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        splitAssignment.putIfAbsent(subtaskId, new HashSet<>());
+    }
+
+    @Override
+    public KinesisStreamsSourceEnumeratorState snapshotState(long 
checkpointId) throws Exception {
+        return new KinesisStreamsSourceEnumeratorState(
+                completedSplitIds, unassignedSplits, lastSeenShardId);
+    }
+
+    @Override
+    public void close() throws IOException {}
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof CompletedShardsEvent) {
+            Set<String> newlyCompletedSplitIds =
+                    ((CompletedShardsEvent) 
sourceEvent).getCompletedSplitIds();
+            LOG.info(
+                    "Received CompletedShardsEvent from subtask {}. Marking 
the following splits as complete: {}",
+                    subtaskId,
+                    newlyCompletedSplitIds);
+            completedSplitIds.addAll(newlyCompletedSplitIds);
+        } else {
+            SplitEnumerator.super.handleSourceEvent(subtaskId, sourceEvent);
+        }
+    }
+
+    private List<KinesisShardSplit> initialDiscoverSplits() {
+        List<Shard> shards = streamProxy.listShards(streamArn, 
lastSeenShardId);
+        return mapToSplits(shards, false);
+    }
+
+    /**
+     * This method is used to discover Kinesis splits the job can subscribe 
to. It can be run in
+     * parallel, is important to not mutate any shared state.
+     *
+     * @return list of discovered splits
+     */
+    private List<KinesisShardSplit> periodicallyDiscoverSplits() {
+        List<Shard> shards = streamProxy.listShards(streamArn, 
lastSeenShardId);
+        // Any shard discovered after the initial startup should be read from 
the start, since they
+        // come from resharding
+        return mapToSplits(shards, true);
+    }
+
+    private List<KinesisShardSplit> mapToSplits(List<Shard> shards, boolean 
shouldReadFromStart) {
+        InitialPosition initialPositionFromConfig =
+                shouldReadFromStart
+                        ? InitialPosition.TRIM_HORIZON
+                        : InitialPosition.valueOf(
+                                consumerConfig
+                                        .getOrDefault(
+                                                STREAM_INITIAL_POSITION,
+                                                
DEFAULT_STREAM_INITIAL_POSITION)
+                                        .toString());
+        StartingPosition startingPosition;
+        switch (initialPositionFromConfig) {
+            case LATEST:
+                // If LATEST is requested, we still set the starting position 
to the time of
+                // startup. This way, the job starts reading from a 
deterministic timestamp
+                // (i.e. time of job submission), even if it enters a restart 
loop immediately
+                // after submission.
+                startingPosition = 
StartingPosition.fromTimestamp(Instant.now());
+                break;
+            case AT_TIMESTAMP:
+                startingPosition =
+                        StartingPosition.fromTimestamp(
+                                
parseStreamTimestampStartingPosition(consumerConfig).toInstant());
+                break;
+            case TRIM_HORIZON:
+            default:
+                startingPosition = StartingPosition.fromStart();
+        }
+
+        List<KinesisShardSplit> splits = new ArrayList<>();
+        for (Shard shard : shards) {
+            splits.add(new KinesisShardSplit(streamArn, shard.shardId(), 
startingPosition));
+        }
+
+        return splits;
+    }
+
+    /**
+     * This method assigns a given set of Kinesis splits to the readers 
currently registered on the
+     * cluster. This assignment is done via a side-effect on the {@link 
SplitEnumeratorContext}
+     * object.
+     *
+     * @param discoveredSplits list of discovered splits
+     * @param t throwable thrown when discovering splits. Will be null if no 
throwable thrown.
+     */
+    private void assignSplits(List<KinesisShardSplit> discoveredSplits, 
Throwable t) {
+        if (t != null) {
+            throw new KinesisStreamsSourceException("Failed to list shards.", 
t);
+        }
+
+        if (context.registeredReaders().isEmpty()) {
+            LOG.info("No registered readers, skipping assignment of discovered 
splits.");
+            unassignedSplits.addAll(discoveredSplits);
+            return;
+        }
+
+        Map<Integer, List<KinesisShardSplit>> newSplitAssignments = new 
HashMap<>();
+        for (KinesisShardSplit split : unassignedSplits) {
+            assignSplitToSubtask(split, newSplitAssignments);
+        }
+        unassignedSplits.clear();
+        for (KinesisShardSplit split : discoveredSplits) {
+            assignSplitToSubtask(split, newSplitAssignments);
+        }
+
+        updateLastSeenShardId(discoveredSplits);
+        updateSplitAssignment(newSplitAssignments);
+        context.assignSplits(new SplitsAssignment<>(newSplitAssignments));

Review Comment:
   Is this call synchronous/atomic? Is it possible that a checkpoint could be 
triggered/completed resulting in race condition here:
   1. Discover split X
   2. Start assign split X to subTask 
   3. Trigger checkpoint N
   4. Complete checkpoint N
   5. Finish assigning split X to subTask
   6. Job fails, restore from checkpoint N
   7. Split X is now lost (I suppose it would be rediscovered here though)



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/KinesisStreamsSource.java:
##########
@@ -0,0 +1,167 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source;
+
+import org.apache.flink.annotation.Experimental;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.Boundedness;
+import org.apache.flink.api.connector.source.Source;
+import org.apache.flink.api.connector.source.SourceReader;
+import org.apache.flink.api.connector.source.SourceReaderContext;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.connector.aws.util.AWSClientUtil;
+import org.apache.flink.connector.aws.util.AWSGeneralUtil;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import 
org.apache.flink.connector.base.source.reader.fetcher.SingleThreadFetcherManager;
+import 
org.apache.flink.connector.base.source.reader.synchronization.FutureCompletingBlockingQueue;
+import org.apache.flink.connector.kinesis.sink.KinesisStreamsConfigConstants;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumerator;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorState;
+import 
org.apache.flink.connector.kinesis.source.enumerator.KinesisStreamsSourceEnumeratorStateSerializer;
+import org.apache.flink.connector.kinesis.source.proxy.KinesisStreamProxy;
+import 
org.apache.flink.connector.kinesis.source.reader.KinesisStreamsRecordEmitter;
+import 
org.apache.flink.connector.kinesis.source.reader.KinesisStreamsSourceReader;
+import 
org.apache.flink.connector.kinesis.source.reader.PollingKinesisShardSplitReader;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import 
org.apache.flink.connector.kinesis.source.split.KinesisShardSplitSerializer;
+import org.apache.flink.core.io.SimpleVersionedSerializer;
+
+import software.amazon.awssdk.http.SdkHttpClient;
+import software.amazon.awssdk.http.apache.ApacheHttpClient;
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.Record;
+import software.amazon.awssdk.utils.AttributeMap;
+
+import java.util.Properties;
+import java.util.function.Supplier;
+
+/**
+ * The {@link KinesisStreamsSource} is an exactly-once parallel streaming data 
source that
+ * subscribes to a single AWS Kinesis data stream. It is able to handle 
resharding of streams, and
+ * stores its current progress in Flink checkpoints. The source will read in 
data from the Kinesis
+ * Data stream, deserialize it using the provided {@link 
DeserializationSchema}, and emit the record
+ * into the Flink job graph.
+ *
+ * <p>Exactly-once semantics. To leverage Flink's checkpointing mechanics for 
exactly-once stream
+ * processing, the Kinesis Source is implemented with the AWS Java SDK, 
instead of the officially
+ * recommended AWS Kinesis Client Library. The source will store its current 
progress in Flink
+ * checkpoint/savepoint, and will pick up from where it left off upon restore 
from the
+ * checkpoint/savepoint.
+ *
+ * <p>Initial starting points. The Kinesis Streams Source supports reads 
starting from TRIM_HORIZON,
+ * LATEST, and AT_TIMESTAMP.
+ *
+ * @param <T> the data type emitted by the source
+ */
+@Experimental
+public class KinesisStreamsSource<T>
+        implements Source<T, KinesisShardSplit, 
KinesisStreamsSourceEnumeratorState> {
+
+    private final String streamArn;
+    private final Properties consumerConfig;
+    private final DeserializationSchema<T> deserializationSchema;
+
+    public KinesisStreamsSource(
+            String streamArn,
+            Properties consumerConfig,
+            DeserializationSchema<T> deserializationSchema) {
+        this.streamArn = streamArn;
+        this.consumerConfig = consumerConfig;
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    @Override
+    public Boundedness getBoundedness() {
+        return Boundedness.CONTINUOUS_UNBOUNDED;
+    }
+
+    @Override
+    public SourceReader<T, KinesisShardSplit> createReader(SourceReaderContext 
readerContext)
+            throws Exception {
+        FutureCompletingBlockingQueue<RecordsWithSplitIds<Record>> 
elementsQueue =
+                new FutureCompletingBlockingQueue<>();
+        KinesisStreamProxy kinesisStreamProxy = 
createKinesisStreamProxy(consumerConfig);
+        Supplier<PollingKinesisShardSplitReader> splitReaderSupplier =
+                () -> new PollingKinesisShardSplitReader(kinesisStreamProxy);
+        KinesisStreamsRecordEmitter<T> recordEmitter =

Review Comment:
   I believe we should be invoking `deserializationSchema.open(..)` here, or 
somewhere. 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/KafkaSource.java#L144



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/KinesisStreamProxy.java:
##########
@@ -0,0 +1,144 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.proxy;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import software.amazon.awssdk.services.kinesis.KinesisClient;
+import software.amazon.awssdk.services.kinesis.model.ExpiredIteratorException;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsRequest;
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.GetShardIteratorRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsRequest;
+import software.amazon.awssdk.services.kinesis.model.ListShardsResponse;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/** Implementation of the {@link StreamProxy} for Kinesis data streams. */
+@Internal
+public class KinesisStreamProxy implements StreamProxy {
+
+    private final KinesisClient kinesisClient;
+    private final Map<String, String> shardIdToIteratorStore;
+
+    public KinesisStreamProxy(KinesisClient kinesisClient) {
+        this.kinesisClient = kinesisClient;
+        this.shardIdToIteratorStore = new ConcurrentHashMap<>();
+    }
+
+    @Override
+    public List<Shard> listShards(String streamArn, @Nullable String 
lastSeenShardId) {
+        List<Shard> shards = new ArrayList<>();
+
+        ListShardsResponse listShardsResponse;
+        String nextToken = null;
+        do {
+            listShardsResponse =
+                    kinesisClient.listShards(
+                            ListShardsRequest.builder()
+                                    .streamARN(streamArn)
+                                    .exclusiveStartShardId(
+                                            nextToken == null ? 
lastSeenShardId : null)
+                                    .nextToken(nextToken)
+                                    .build());
+
+            shards.addAll(listShardsResponse.shards());
+            nextToken = listShardsResponse.nextToken();
+        } while (nextToken != null);

Review Comment:
   We are relying on the SDK client retry and backoff policy?



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/KinesisStreamsRecordEmitter.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.reader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.common.serialization.DeserializationSchema;
+import org.apache.flink.api.connector.source.SourceOutput;
+import org.apache.flink.connector.base.source.reader.RecordEmitter;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+/**
+ * Emits record from the source into the Flink job graph. This serves as the 
interface between the
+ * source and the Flink job.
+ *
+ * @param <T> the data type being emitted into the Flink job graph
+ */
+@Internal
+public class KinesisStreamsRecordEmitter<T>
+        implements RecordEmitter<Record, T, KinesisShardSplitState> {
+
+    private final DeserializationSchema<T> deserializationSchema;
+
+    public KinesisStreamsRecordEmitter(DeserializationSchema<T> 
deserializationSchema) {
+        this.deserializationSchema = deserializationSchema;
+    }
+
+    @Override
+    public void emitRecord(
+            Record element, SourceOutput<T> output, KinesisShardSplitState 
splitState)
+            throws Exception {
+        output.collect(
+                
deserializationSchema.deserialize(element.data().asByteArray()),

Review Comment:
   In the old source we have option to use `KinesisDeserializationSchema` which 
supplies sequence number, this is a feature regression. 
   
   Additionally we should add support to use the collector variant 
https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/source/reader/KafkaRecordEmitter.java#L60



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/proxy/StreamProxy.java:
##########
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.proxy;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.util.List;
+
+/** Interface for a StreamProxy to interact with Streams service in a given 
region. */
+@Internal
+public interface StreamProxy {

Review Comment:
   nit: The name is quite generic, consider renaming this to 
`KinesisStreamsProxy` and rename the implementation to 
`AwsV2KinesisStreamsProxy`



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitState.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.split;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Stores the metadata around a given {@link KinesisShardSplit}. This class is 
stored in state, and
+ * any changes should be backwards compatible.
+ */
+@Internal
+public class KinesisShardSplitState {
+    private final KinesisShardSplit kinesisShardSplit;
+    private StartingPosition nextStartingPosition;
+    private String nextShardIterator;
+
+    public KinesisShardSplitState(KinesisShardSplit kinesisShardSplit) {
+        this.kinesisShardSplit = kinesisShardSplit;
+        this.nextStartingPosition = kinesisShardSplit.getStartingPosition();
+    }
+
+    public KinesisShardSplit getKinesisShardSplit() {
+        return new KinesisShardSplit(
+                kinesisShardSplit.getStreamArn(),
+                kinesisShardSplit.getShardId(),
+                nextStartingPosition);
+    }
+
+    public String splitId() {

Review Comment:
   Missing `get`



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/reader/PollingKinesisShardSplitReader.java:
##########
@@ -0,0 +1,152 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.reader;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.connector.base.source.reader.RecordsWithSplitIds;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitReader;
+import org.apache.flink.connector.base.source.reader.splitreader.SplitsChange;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplitState;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import software.amazon.awssdk.services.kinesis.model.GetRecordsResponse;
+import software.amazon.awssdk.services.kinesis.model.Record;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.util.ArrayDeque;
+import java.util.Collections;
+import java.util.Deque;
+import java.util.Iterator;
+import java.util.Set;
+
+/**
+ * An implementation of the SplitReader that periodically polls the Kinesis 
stream to retrieve
+ * records.
+ */
+@Internal
+public class PollingKinesisShardSplitReader implements SplitReader<Record, 
KinesisShardSplit> {
+
+    private static final RecordsWithSplitIds<Record> 
INCOMPLETE_SHARD_EMPTY_RECORDS =
+            new KinesisRecordsWithSplitIds(Collections.emptyIterator(), null, 
false);
+
+    private final StreamProxy kinesis;
+    private final Deque<KinesisShardSplitState> assignedSplits = new 
ArrayDeque<>();
+
+    public PollingKinesisShardSplitReader(StreamProxy kinesisProxy) {
+        this.kinesis = kinesisProxy;
+    }
+
+    @Override
+    public RecordsWithSplitIds<Record> fetch() throws IOException {
+        KinesisShardSplitState splitState = assignedSplits.poll();
+        if (splitState == null) {
+            return INCOMPLETE_SHARD_EMPTY_RECORDS;
+        }
+
+        GetRecordsResponse getRecordsResponse =
+                kinesis.getRecords(
+                        splitState.getStreamArn(),
+                        splitState.getShardId(),
+                        splitState.getNextStartingPosition());
+        boolean isComplete = getRecordsResponse.nextShardIterator() == null;
+
+        if (hasNoRecords(getRecordsResponse)) {
+            if (isComplete) {
+                return new KinesisRecordsWithSplitIds(
+                        Collections.emptyIterator(), splitState.splitId(), 
true);
+            } else {
+                assignedSplits.add(splitState);
+                return INCOMPLETE_SHARD_EMPTY_RECORDS;
+            }
+        }
+
+        splitState.setNextStartingPosition(
+                StartingPosition.continueFromSequenceNumber(
+                        getRecordsResponse
+                                .records()
+                                .get(getRecordsResponse.records().size() - 1)
+                                .sequenceNumber()));
+
+        assignedSplits.add(splitState);
+        return new KinesisRecordsWithSplitIds(
+                getRecordsResponse.records().iterator(), splitState.splitId(), 
isComplete);
+    }
+
+    private boolean hasNoRecords(GetRecordsResponse getRecordsResponse) {
+        return !getRecordsResponse.hasRecords() || 
getRecordsResponse.records().isEmpty();
+    }
+
+    @Override
+    public void handleSplitsChanges(SplitsChange<KinesisShardSplit> 
splitsChanges) {
+        for (KinesisShardSplit split : splitsChanges.splits()) {
+            assignedSplits.add(new KinesisShardSplitState(split));
+        }
+    }
+
+    @Override
+    public void wakeUp() {
+        // Do nothing because we don't have any sleep mechanism
+    }
+
+    @Override
+    public void close() throws Exception {}
+
+    private static class KinesisRecordsWithSplitIds implements 
RecordsWithSplitIds<Record> {
+
+        private final Iterator<Record> recordsIterator;

Review Comment:
   Looks like we are not supporting de-aggregation. We need to keep this as it 
might be a blocker for many users



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/StartingPosition.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.split;
+
+import org.apache.flink.annotation.Internal;
+
+import software.amazon.awssdk.services.kinesis.model.ShardIteratorType;
+
+import javax.annotation.Nullable;
+
+import java.time.Instant;
+import java.util.Objects;
+
+import static 
software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AFTER_SEQUENCE_NUMBER;
+import static 
software.amazon.awssdk.services.kinesis.model.ShardIteratorType.AT_TIMESTAMP;
+import static 
software.amazon.awssdk.services.kinesis.model.ShardIteratorType.TRIM_HORIZON;
+
+/** Data class indicating the starting position for reading a given shard. */
+@Internal
+public final class StartingPosition {
+
+    private final ShardIteratorType shardIteratorType;
+    private final Object startingMarker;
+
+    StartingPosition(ShardIteratorType shardIteratorType, Object 
startingMarker) {
+        this.shardIteratorType = shardIteratorType;
+        this.startingMarker = startingMarker;
+    }
+
+    public ShardIteratorType getShardIteratorType() {
+        return shardIteratorType;
+    }
+
+    @Nullable
+    public Object getStartingMarker() {
+        return startingMarker;
+    }
+
+    public static StartingPosition fromTimestamp(final Instant timestamp) {
+        return new StartingPosition(AT_TIMESTAMP, timestamp);
+    }
+
+    public static StartingPosition continueFromSequenceNumber(final String 
sequenceNumber) {

Review Comment:
   FYI, I spent some time looking through this so thought I would share my 
findings. I questioned whether we need to support the 
`restartFromSequenceNumber` like the legacy source, but I believe we do not and 
this is good.



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;
+import 
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.model.CompletedShardsEvent;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigUtil.parseStreamTimestampStartingPosition;
+
+/**
+ * This class is used to discover and assign Kinesis splits to subtasks on the 
Flink cluster. This
+ * runs on the JobManager.
+ */
+@Internal
+public class KinesisStreamsSourceEnumerator
+        implements SplitEnumerator<KinesisShardSplit, 
KinesisStreamsSourceEnumeratorState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<KinesisShardSplit> context;
+    private final String streamArn;
+    private final Properties consumerConfig;
+    private final StreamProxy streamProxy;
+    private final KinesisShardAssigner shardAssigner;
+    private final KinesisShardAssigner.Context shardAssignerContext;
+
+    private final Map<Integer, Set<KinesisShardSplit>> splitAssignment = new 
HashMap<>();
+    private final Set<String> assignedSplitIds = new HashSet<>();
+    private final Set<KinesisShardSplit> unassignedSplits;
+    private final Set<String> completedSplitIds;

Review Comment:
   I see we query from last seen shard ID. Therefore do we really need this 
`completedSplitIds` at all? Can we just track the last seen and assume all the 
others are either assigned to subTask or done?



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;
+import 
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.model.CompletedShardsEvent;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigUtil.parseStreamTimestampStartingPosition;
+
+/**
+ * This class is used to discover and assign Kinesis splits to subtasks on the 
Flink cluster. This
+ * runs on the JobManager.
+ */
+@Internal
+public class KinesisStreamsSourceEnumerator
+        implements SplitEnumerator<KinesisShardSplit, 
KinesisStreamsSourceEnumeratorState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<KinesisShardSplit> context;
+    private final String streamArn;
+    private final Properties consumerConfig;
+    private final StreamProxy streamProxy;
+    private final KinesisShardAssigner shardAssigner;
+    private final KinesisShardAssigner.Context shardAssignerContext;
+
+    private final Map<Integer, Set<KinesisShardSplit>> splitAssignment = new 
HashMap<>();
+    private final Set<String> assignedSplitIds = new HashSet<>();
+    private final Set<KinesisShardSplit> unassignedSplits;
+    private final Set<String> completedSplitIds;

Review Comment:
   Let's sync with the Kinesis team to see if there is any chance shards might 
reappear once they expire



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitState.java:
##########
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.split;
+
+import org.apache.flink.annotation.Internal;
+
+/**
+ * Stores the metadata around a given {@link KinesisShardSplit}. This class is 
stored in state, and
+ * any changes should be backwards compatible.
+ */
+@Internal
+public class KinesisShardSplitState {
+    private final KinesisShardSplit kinesisShardSplit;
+    private StartingPosition nextStartingPosition;
+    private String nextShardIterator;
+
+    public KinesisShardSplitState(KinesisShardSplit kinesisShardSplit) {
+        this.kinesisShardSplit = kinesisShardSplit;
+        this.nextStartingPosition = kinesisShardSplit.getStartingPosition();
+    }
+
+    public KinesisShardSplit getKinesisShardSplit() {
+        return new KinesisShardSplit(
+                kinesisShardSplit.getStreamArn(),
+                kinesisShardSplit.getShardId(),
+                nextStartingPosition);
+    }
+
+    public String splitId() {
+        return kinesisShardSplit.splitId();
+    }
+
+    public String getStreamArn() {
+        return kinesisShardSplit.getStreamArn();
+    }
+
+    public String getShardId() {
+        return kinesisShardSplit.getShardId();
+    }
+
+    public StartingPosition getNextStartingPosition() {
+        return nextStartingPosition;
+    }
+
+    public void setNextStartingPosition(StartingPosition nextStartingPosition) 
{

Review Comment:
   Does this class need to be mutable?



##########
flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/enumerator/KinesisStreamsSourceEnumerator.java:
##########
@@ -0,0 +1,338 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.connector.kinesis.source.enumerator;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.connector.source.ReaderInfo;
+import org.apache.flink.api.connector.source.SourceEvent;
+import org.apache.flink.api.connector.source.SplitEnumerator;
+import org.apache.flink.api.connector.source.SplitEnumeratorContext;
+import org.apache.flink.api.connector.source.SplitsAssignment;
+import 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.InitialPosition;
+import 
org.apache.flink.connector.kinesis.source.enumerator.assigner.ShardAssignerFactory;
+import 
org.apache.flink.connector.kinesis.source.exception.KinesisStreamsSourceException;
+import org.apache.flink.connector.kinesis.source.model.CompletedShardsEvent;
+import org.apache.flink.connector.kinesis.source.proxy.StreamProxy;
+import org.apache.flink.connector.kinesis.source.split.KinesisShardSplit;
+import org.apache.flink.connector.kinesis.source.split.StartingPosition;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import software.amazon.awssdk.services.kinesis.model.Shard;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.DEFAULT_STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.SHARD_DISCOVERY_INTERVAL_MILLIS;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigConstants.STREAM_INITIAL_POSITION;
+import static 
org.apache.flink.connector.kinesis.source.config.SourceConfigUtil.parseStreamTimestampStartingPosition;
+
+/**
+ * This class is used to discover and assign Kinesis splits to subtasks on the 
Flink cluster. This
+ * runs on the JobManager.
+ */
+@Internal
+public class KinesisStreamsSourceEnumerator
+        implements SplitEnumerator<KinesisShardSplit, 
KinesisStreamsSourceEnumeratorState> {
+
+    private static final Logger LOG = 
LoggerFactory.getLogger(KinesisStreamsSourceEnumerator.class);
+
+    private final SplitEnumeratorContext<KinesisShardSplit> context;
+    private final String streamArn;
+    private final Properties consumerConfig;
+    private final StreamProxy streamProxy;
+    private final KinesisShardAssigner shardAssigner;
+    private final KinesisShardAssigner.Context shardAssignerContext;
+
+    private final Map<Integer, Set<KinesisShardSplit>> splitAssignment = new 
HashMap<>();
+    private final Set<String> assignedSplitIds = new HashSet<>();
+    private final Set<KinesisShardSplit> unassignedSplits;
+    private final Set<String> completedSplitIds;
+
+    private String lastSeenShardId;
+
+    public KinesisStreamsSourceEnumerator(
+            SplitEnumeratorContext<KinesisShardSplit> context,
+            String streamArn,
+            Properties consumerConfig,
+            StreamProxy streamProxy,
+            KinesisStreamsSourceEnumeratorState state) {
+        this.context = context;
+        this.streamArn = streamArn;
+        this.consumerConfig = consumerConfig;
+        this.streamProxy = streamProxy;
+        this.shardAssigner = ShardAssignerFactory.uniformShardAssigner();
+        this.shardAssignerContext = new ShardAssignerContext(splitAssignment, 
context);
+        if (state == null) {
+            this.completedSplitIds = new HashSet<>();
+            this.lastSeenShardId = null;
+            this.unassignedSplits = new HashSet<>();
+        } else {
+            this.completedSplitIds = state.getCompletedSplitIds();
+            this.lastSeenShardId = state.getLastSeenShardId();
+            this.unassignedSplits = state.getUnassignedSplits();
+        }
+    }
+
+    @Override
+    public void start() {
+        if (lastSeenShardId == null) {
+            context.callAsync(this::initialDiscoverSplits, this::assignSplits);
+        }
+
+        final long shardDiscoveryInterval =
+                Long.parseLong(
+                        consumerConfig.getProperty(
+                                SHARD_DISCOVERY_INTERVAL_MILLIS,
+                                
String.valueOf(DEFAULT_SHARD_DISCOVERY_INTERVAL_MILLIS)));
+        context.callAsync(
+                this::periodicallyDiscoverSplits,
+                this::assignSplits,
+                shardDiscoveryInterval,
+                shardDiscoveryInterval);
+    }
+
+    @Override
+    public void handleSplitRequest(int subtaskId, @Nullable String 
requesterHostname) {
+        // Do nothing, since we assign splits eagerly
+    }
+
+    @Override
+    public void addSplitsBack(List<KinesisShardSplit> splits, int subtaskId) {
+        if (!splitAssignment.containsKey(subtaskId)) {
+            LOG.warn(
+                    "Unable to add splits back for subtask {} since it is not 
assigned any splits. Splits: {}",
+                    subtaskId,
+                    splits);
+            return;
+        }
+
+        for (KinesisShardSplit split : splits) {
+            splitAssignment.get(subtaskId).remove(split);
+            assignedSplitIds.remove(split.splitId());
+            unassignedSplits.add(split);
+        }
+    }
+
+    @Override
+    public void addReader(int subtaskId) {
+        splitAssignment.putIfAbsent(subtaskId, new HashSet<>());
+    }
+
+    @Override
+    public KinesisStreamsSourceEnumeratorState snapshotState(long 
checkpointId) throws Exception {
+        return new KinesisStreamsSourceEnumeratorState(
+                completedSplitIds, unassignedSplits, lastSeenShardId);
+    }
+
+    @Override
+    public void close() throws IOException {}
+
+    @Override
+    public void handleSourceEvent(int subtaskId, SourceEvent sourceEvent) {
+        if (sourceEvent instanceof CompletedShardsEvent) {
+            Set<String> newlyCompletedSplitIds =
+                    ((CompletedShardsEvent) 
sourceEvent).getCompletedSplitIds();
+            LOG.info(
+                    "Received CompletedShardsEvent from subtask {}. Marking 
the following splits as complete: {}",
+                    subtaskId,
+                    newlyCompletedSplitIds);
+            completedSplitIds.addAll(newlyCompletedSplitIds);
+        } else {
+            SplitEnumerator.super.handleSourceEvent(subtaskId, sourceEvent);
+        }
+    }
+
+    private List<KinesisShardSplit> initialDiscoverSplits() {
+        List<Shard> shards = streamProxy.listShards(streamArn, 
lastSeenShardId);
+        return mapToSplits(shards, false);
+    }
+
+    /**
+     * This method is used to discover Kinesis splits the job can subscribe 
to. It can be run in
+     * parallel, is important to not mutate any shared state.
+     *
+     * @return list of discovered splits
+     */
+    private List<KinesisShardSplit> periodicallyDiscoverSplits() {
+        List<Shard> shards = streamProxy.listShards(streamArn, 
lastSeenShardId);
+        // Any shard discovered after the initial startup should be read from 
the start, since they
+        // come from resharding
+        return mapToSplits(shards, true);
+    }
+
+    private List<KinesisShardSplit> mapToSplits(List<Shard> shards, boolean 
shouldReadFromStart) {
+        InitialPosition initialPositionFromConfig =
+                shouldReadFromStart
+                        ? InitialPosition.TRIM_HORIZON
+                        : InitialPosition.valueOf(
+                                consumerConfig
+                                        .getOrDefault(
+                                                STREAM_INITIAL_POSITION,
+                                                
DEFAULT_STREAM_INITIAL_POSITION)
+                                        .toString());
+        StartingPosition startingPosition;
+        switch (initialPositionFromConfig) {
+            case LATEST:
+                // If LATEST is requested, we still set the starting position 
to the time of
+                // startup. This way, the job starts reading from a 
deterministic timestamp
+                // (i.e. time of job submission), even if it enters a restart 
loop immediately
+                // after submission.
+                startingPosition = 
StartingPosition.fromTimestamp(Instant.now());
+                break;
+            case AT_TIMESTAMP:
+                startingPosition =
+                        StartingPosition.fromTimestamp(
+                                
parseStreamTimestampStartingPosition(consumerConfig).toInstant());
+                break;
+            case TRIM_HORIZON:
+            default:
+                startingPosition = StartingPosition.fromStart();
+        }
+
+        List<KinesisShardSplit> splits = new ArrayList<>();
+        for (Shard shard : shards) {
+            splits.add(new KinesisShardSplit(streamArn, shard.shardId(), 
startingPosition));
+        }
+
+        return splits;
+    }
+
+    /**
+     * This method assigns a given set of Kinesis splits to the readers 
currently registered on the
+     * cluster. This assignment is done via a side-effect on the {@link 
SplitEnumeratorContext}
+     * object.
+     *
+     * @param discoveredSplits list of discovered splits
+     * @param t throwable thrown when discovering splits. Will be null if no 
throwable thrown.
+     */
+    private void assignSplits(List<KinesisShardSplit> discoveredSplits, 
Throwable t) {
+        if (t != null) {
+            throw new KinesisStreamsSourceException("Failed to list shards.", 
t);
+        }
+
+        if (context.registeredReaders().isEmpty()) {

Review Comment:
   Or is it during startup while subTasks are initialising? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscr...@flink.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to