[ 
https://issues.apache.org/jira/browse/FLINK-4821?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15974155#comment-15974155
 ] 

ASF GitHub Bot commented on FLINK-4821:
---------------------------------------

Github user tzulitai commented on a diff in the pull request:

    https://github.com/apache/flink/pull/3001#discussion_r112123170
  
    --- Diff: 
flink-connectors/flink-connector-kinesis/src/test/java/org/apache/flink/streaming/connectors/kinesis/FlinkKinesisConsumerTest.java
 ---
    @@ -559,48 +699,298 @@ public void 
testFetcherShouldNotBeRestoringFromFailureIfNotRestoringFromCheckpoi
     
        @Test
        @SuppressWarnings("unchecked")
    +   public void 
testFetcherShouldBeCorrectlySeededIfRestoringFromLegacyCheckpoint() throws 
Exception {
    +           HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = 
getFakeRestoredStore("all");
    +
    +           KinesisDataFetcher mockedFetcher = 
Mockito.mock(KinesisDataFetcher.class);
    +           List<KinesisStreamShard> shards = new ArrayList<>();
    +           shards.addAll(fakeRestoredState.keySet());
    +           
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
    +           
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
    +
    +           // assume the given config is correct
    +           PowerMockito.mockStatic(KinesisConfigUtil.class);
    +           PowerMockito.doNothing().when(KinesisConfigUtil.class);
    +
    +           TestableFlinkKinesisConsumer consumer = new 
TestableFlinkKinesisConsumer(
    +                   "fakeStream", new Properties(), 10, 2);
    +           consumer.restoreState(fakeRestoredState);
    +           consumer.open(new Configuration());
    +           consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
    +
    +           Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
    +           for (Map.Entry<KinesisStreamShard, SequenceNumber> 
restoredShard : fakeRestoredState.entrySet()) {
    +                   
Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream(
    +                           restoredShard.getKey().getStreamName(), 
restoredShard.getKey().getShard().getShardId());
    +                   
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
    +                           new 
KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
    +           }
    +   }
    +
    +   @Test
    +   @SuppressWarnings("unchecked")
        public void 
testFetcherShouldBeCorrectlySeededIfRestoringFromCheckpoint() throws Exception {
    +           // 
----------------------------------------------------------------------
    +           // setting initial state
    +           // 
----------------------------------------------------------------------
    +           HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = 
getFakeRestoredStore("all");
    +
    +           // 
----------------------------------------------------------------------
    +           // mock operator state backend and initial state for 
initializeState()
    +           // 
----------------------------------------------------------------------
    +           TestingListState<Serializable> listState = new 
TestingListState<>();
    +           for (Map.Entry<KinesisStreamShard, SequenceNumber> state: 
fakeRestoredState.entrySet()) {
    +                   listState.add(Tuple2.of(state.getKey(), 
state.getValue()));
    +           }
    +
    +           OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
    +           
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
    +
    +           StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
    +           
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
    +           when(initializationContext.isRestored()).thenReturn(true);
    +
    +           // 
----------------------------------------------------------------------
    +           // mock fetcher
    +           // 
----------------------------------------------------------------------
                KinesisDataFetcher mockedFetcher = 
Mockito.mock(KinesisDataFetcher.class);
    +           List<KinesisStreamShard> shards = new ArrayList<>();
    +           shards.addAll(fakeRestoredState.keySet());
    +           
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
                
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
     
                // assume the given config is correct
                PowerMockito.mockStatic(KinesisConfigUtil.class);
                PowerMockito.doNothing().when(KinesisConfigUtil.class);
     
    -           HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = 
new HashMap<>();
    -           fakeRestoredState.put(
    -                   new KinesisStreamShard("fakeStream1",
    -                           new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
    -                   new SequenceNumber(UUID.randomUUID().toString()));
    -           fakeRestoredState.put(
    -                   new KinesisStreamShard("fakeStream1",
    -                           new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
    -                   new SequenceNumber(UUID.randomUUID().toString()));
    -           fakeRestoredState.put(
    -                   new KinesisStreamShard("fakeStream1",
    -                           new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(2))),
    -                   new SequenceNumber(UUID.randomUUID().toString()));
    -           fakeRestoredState.put(
    -                   new KinesisStreamShard("fakeStream2",
    -                           new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(0))),
    -                   new SequenceNumber(UUID.randomUUID().toString()));
    -           fakeRestoredState.put(
    -                   new KinesisStreamShard("fakeStream2",
    -                           new 
Shard().withShardId(KinesisShardIdGenerator.generateFromShardOrder(1))),
    -                   new SequenceNumber(UUID.randomUUID().toString()));
    +           // 
----------------------------------------------------------------------
    +           // start to test seed initial state to fetcher
    +           // 
----------------------------------------------------------------------
    +           TestableFlinkKinesisConsumer consumer = new 
TestableFlinkKinesisConsumer(
    +                   "fakeStream", new Properties(), 10, 2);
    +           consumer.initializeState(initializationContext);
    +           consumer.open(new Configuration());
    +           consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
    +
    +           Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
    +           for (Map.Entry<KinesisStreamShard, SequenceNumber> 
restoredShard : fakeRestoredState.entrySet()) {
    +                   
Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream(
    +                           restoredShard.getKey().getStreamName(), 
restoredShard.getKey().getShard().getShardId());
    +                   
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
    +                           new 
KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
    +           }
    +   }
    +
    +   @Test
    +   @SuppressWarnings("unchecked")
    +   public void testFetcherShouldBeCorrectlySeededOnlyItsOwnStates() throws 
Exception {
    +           // 
----------------------------------------------------------------------
    +           // setting initial state
    +           // 
----------------------------------------------------------------------
    +           HashMap<KinesisStreamShard, SequenceNumber> fakeRestoredState = 
getFakeRestoredStore("fakeStream1");
    +
    +           HashMap<KinesisStreamShard, SequenceNumber> 
fakeRestoredStateForOthers = getFakeRestoredStore("fakeStream2");
    +
    +           // 
----------------------------------------------------------------------
    +           // mock operator state backend and initial state for 
initializeState()
    +           // 
----------------------------------------------------------------------
    +           TestingListState<Serializable> listState = new 
TestingListState<>();
    +           for (Map.Entry<KinesisStreamShard, SequenceNumber> state: 
fakeRestoredState.entrySet()) {
    +                   listState.add(Tuple2.of(state.getKey(), 
state.getValue()));
    +           }
    +           for (Map.Entry<KinesisStreamShard, SequenceNumber> state: 
fakeRestoredStateForOthers.entrySet()) {
    +                   listState.add(Tuple2.of(state.getKey(), 
state.getValue()));
    +           }
     
    +           OperatorStateStore operatorStateStore = 
mock(OperatorStateStore.class);
    +           
when(operatorStateStore.getUnionListState(Matchers.any(ListStateDescriptor.class))).thenReturn(listState);
    +
    +           StateInitializationContext initializationContext = 
mock(StateInitializationContext.class);
    +           
when(initializationContext.getOperatorStateStore()).thenReturn(operatorStateStore);
    +           when(initializationContext.isRestored()).thenReturn(true);
    +
    +           // 
----------------------------------------------------------------------
    +           // mock fetcher
    +           // 
----------------------------------------------------------------------
    +           KinesisDataFetcher mockedFetcher = 
Mockito.mock(KinesisDataFetcher.class);
    +           List<KinesisStreamShard> shards = new ArrayList<>();
    +           shards.addAll(fakeRestoredState.keySet());
    +           
when(mockedFetcher.discoverNewShardsToSubscribe()).thenReturn(shards);
    +           
PowerMockito.whenNew(KinesisDataFetcher.class).withAnyArguments().thenReturn(mockedFetcher);
    +
    +           // assume the given config is correct
    +           PowerMockito.mockStatic(KinesisConfigUtil.class);
    +           PowerMockito.doNothing().when(KinesisConfigUtil.class);
    +
    +           // 
----------------------------------------------------------------------
    +           // start to test seed initial state to fetcher
    +           // 
----------------------------------------------------------------------
                TestableFlinkKinesisConsumer consumer = new 
TestableFlinkKinesisConsumer(
                        "fakeStream", new Properties(), 10, 2);
    -           consumer.restoreState(fakeRestoredState);
    +           consumer.initializeState(initializationContext);
                consumer.open(new Configuration());
                consumer.run(Mockito.mock(SourceFunction.SourceContext.class));
     
                Mockito.verify(mockedFetcher).setIsRestoringFromFailure(true);
    +           for (Map.Entry<KinesisStreamShard, SequenceNumber> 
restoredShard : fakeRestoredStateForOthers.entrySet()) {
    +                   // should never get restored state not belonging to 
itself
    +                   Mockito.verify(mockedFetcher, 
never()).advanceLastDiscoveredShardOfStream(
    +                           restoredShard.getKey().getStreamName(), 
restoredShard.getKey().getShard().getShardId());
    +                   Mockito.verify(mockedFetcher, 
never()).registerNewSubscribedShardState(
    +                           new 
KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
    +           }
                for (Map.Entry<KinesisStreamShard, SequenceNumber> 
restoredShard : fakeRestoredState.entrySet()) {
    +                   // should get restored state belonging to itself
                        
Mockito.verify(mockedFetcher).advanceLastDiscoveredShardOfStream(
                                restoredShard.getKey().getStreamName(), 
restoredShard.getKey().getShard().getShardId());
                        
Mockito.verify(mockedFetcher).registerNewSubscribedShardState(
                                new 
KinesisStreamShardState(restoredShard.getKey(), restoredShard.getValue()));
                }
        }
    +
    +   @Test
    +   @SuppressWarnings("unchecked")
    +   /*
    +    * If the original parallelism is 2 and states is:
    +    *   Consumer subtask 1:
    +    *     stream1, shard1, SequentialNumber(xxx)
    +    *   Consumer subtask 2:
    +    *     stream1, shard2, SequentialNumber(yyy)
    +    * After discoverNewShardsToSubscribe() if there are two shards 
(shard3, shard4) been created:
    +    *   Consumer subtask 1 (late for discoverNewShardsToSubscribe()):
    +    *     stream1, shard1, SequentialNumber(xxx)
    +    *   Consumer subtask 2:
    +    *     stream1, shard2, SequentialNumber(yyy)
    +    *     stream1, shard4, SequentialNumber(zzz)
    +    *  If snapshotState() occur and parallelism is changed to 1:
    +    *    Broadcast state will be:
    --- End diff --
    
    "Union" state


> Implement rescalable non-partitioned state for Kinesis Connector
> ----------------------------------------------------------------
>
>                 Key: FLINK-4821
>                 URL: https://issues.apache.org/jira/browse/FLINK-4821
>             Project: Flink
>          Issue Type: New Feature
>          Components: Kinesis Connector
>            Reporter: Tzu-Li (Gordon) Tai
>            Assignee: Wei-Che Wei
>
> FLINK-4379 added the rescalable non-partitioned state feature, along with the 
> implementation for the Kafka connector.
> The AWS Kinesis connector will benefit from the feature and should implement 
> it too. This ticket tracks progress for this.



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to