ferenc-csaky commented on code in PR #198:
URL:
https://github.com/apache/flink-connector-aws/pull/198#discussion_r2614531231
##########
flink-connector-aws/flink-connector-aws-kinesis-streams/src/main/java/org/apache/flink/connector/kinesis/source/split/KinesisShardSplitSerializer.java:
##########
Review Comment:
I would suggest to not follow the duplicating the whole previous serializer
code. We could do this in a more elegant fashion. To keep the current structure
and minimal test changes, I'd do something like this:
```java
@Override
public byte[] serialize(KinesisShardSplit split) throws IOException {
return serialize(split, this::serializeV2);
}
@VisibleForTesting
byte[] serialize(
KinesisShardSplit split,
BiConsumerWithException<KinesisShardSplit, DataOutputStream,
IOException> serializer)
throws IOException {
try (ByteArrayOutputStream baos = new ByteArrayOutputStream();
DataOutputStream out = new DataOutputStream(baos)) {
serializer.accept(split, out);
out.flush();
return baos.toByteArray();
}
}
@VisibleForTesting
void serializeV0(KinesisShardSplit split, DataOutputStream out) throws
IOException {
out.writeUTF(split.getStreamArn());
out.writeUTF(split.getShardId());
out.writeUTF(split.getStartingPosition().getShardIteratorType().toString());
if (split.getStartingPosition().getStartingMarker() == null) {
out.writeBoolean(false);
} else {
out.writeBoolean(true);
Object startingMarker =
split.getStartingPosition().getStartingMarker();
out.writeBoolean(startingMarker instanceof Instant);
if (startingMarker instanceof Instant) {
out.writeLong(((Instant) startingMarker).toEpochMilli());
}
out.writeBoolean(startingMarker instanceof String);
if (startingMarker instanceof String) {
out.writeUTF((String) startingMarker);
}
}
}
@VisibleForTesting
void serializeV1(KinesisShardSplit split, DataOutputStream out) throws
IOException {
serializeV0(split, out);
out.writeInt(split.getParentShardIds().size());
for (String parentShardId : split.getParentShardIds()) {
out.writeUTF(parentShardId);
}
out.writeUTF(split.getStartingHashKey());
out.writeUTF(split.getEndingHashKey());
}
@VisibleForTesting
void serializeV2(KinesisShardSplit split, DataOutputStream out) throws
IOException {
serializeV1(split, out);
out.writeBoolean(split.isFinished());
}
```
And then when we want to unit test an older version, we can simply do:
```
byte[] oldSerializedState = serializer.serialize(initialSplit,
serializer::serializeV0);
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]