ferenc-csaky commented on code in PR #198:
URL: 
https://github.com/apache/flink-connector-aws/pull/198#discussion_r2614531231


##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java:
##########


Review Comment:
   I would suggest to not follow the duplicating the whole previous serializer 
code. We could do this in a more elegant fashion. To keep the current structure 
and minimal test changes, I'd do something like this:
   ```java
   @Override
   public byte[] serialize(KinesisShardSplit split) throws IOException {
       return serialize(split, this::serializeV2);
   }
   
   @VisibleForTesting
   byte[] serialize(
           KinesisShardSplit split,
           BiConsumerWithException<KinesisShardSplit, DataOutputStream, 
IOException> serializer)
           throws IOException {
   
       try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
               DataOutputStream out = new DataOutputStream(baos)) {
   
           serializer.accept(split, out);
           out.flush();
   
           return baos.toByteArray();
       }
   }
   
   @VisibleForTesting
   void serializeV0(KinesisShardSplit split, DataOutputStream out) throws 
IOException {
       out.writeUTF(split.getStreamArn());
       out.writeUTF(split.getShardId());
       
out.writeUTF(split.getStartingPosition().getShardIteratorType().toString());
       if (split.getStartingPosition().getStartingMarker() == null) {
           out.writeBoolean(false);
       } else {
           out.writeBoolean(true);
           Object startingMarker = 
split.getStartingPosition().getStartingMarker();
           out.writeBoolean(startingMarker instanceof Instant);
           if (startingMarker instanceof Instant) {
               out.writeLong(((Instant) startingMarker).toEpochMilli());
           }
           out.writeBoolean(startingMarker instanceof String);
           if (startingMarker instanceof String) {
               out.writeUTF((String) startingMarker);
           }
       }
   }
   
   @VisibleForTesting
   void serializeV1(KinesisShardSplit split, DataOutputStream out) throws 
IOException {
       serializeV0(split, out);
   
       out.writeInt(split.getParentShardIds().size());
       for (String parentShardId : split.getParentShardIds()) {
           out.writeUTF(parentShardId);
       }
       out.writeUTF(split.getStartingHashKey());
       out.writeUTF(split.getEndingHashKey());
   }
   
   @VisibleForTesting
   void serializeV2(KinesisShardSplit split, DataOutputStream out) throws 
IOException {
       serializeV1(split, out);
   
       out.writeBoolean(split.isFinished());
   }
   ```
   And then when we want to unit test an older version, we can simply do:
   ```
   byte[] oldSerializedState = serializer.serialize(initialSplit, 
serializer::serializeV0);
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to