AHeise commented on code in PR #154:
URL:
https://github.com/apache/flink-connector-kafka/pull/154#discussion_r2014507776
##########
flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/ExactlyOnceKafkaWriter.java:
##########
@@ -264,11 +282,51 @@ private TransactionAbortStrategyContextImpl
getTransactionAbortStrategyContext(
producerPool.recycle(producer);
return epoch;
};
+ Set<String> ongoingTransactionIds =
+ recoveredStates.stream()
+ .flatMap(
+ s ->
+ s.getOngoingTransactions().stream()
+
.map(CheckpointTransaction::getTransactionalId))
+ .collect(Collectors.toSet());
return new TransactionAbortStrategyContextImpl(
+ this::getTopicNames,
kafkaSinkContext.getParallelInstanceId(),
kafkaSinkContext.getNumberOfParallelInstances(),
prefixesToAbort,
startCheckpointId,
- aborter);
+ aborter,
+ this::getAdminClient,
+ ongoingTransactionIds);
+ }
+
+ private Collection<String> getTopicNames() {
+ KafkaDatasetIdentifier identifier =
+ getDatasetIdentifier()
+ .orElseThrow(
+ () ->
+ new IllegalStateException(
+ "The record serializer does
not expose a static list of target topics."));
+ if (identifier.getTopics() != null) {
+ return identifier.getTopics();
+ }
+ return AdminUtils.getTopicsByPattern(getAdminClient(),
identifier.getTopicPattern());
+ }
Review Comment:
In that case, the user has implement the interfaces.
```
if (transactionNamingStrategy.requiresKnownTopics()) {
checkState(
recordSerializer instanceof
KafkaDatasetFacetProvider,
"For %s naming strategy, the recordSerializer needs
to expose the target topics though implementing KafkaDatasetFacetProvider.",
transactionNamingStrategy);
}
```
Note that Artem planned to extend ListTransaction API to look by transaction
id prefix (which is what we actually want) which makes this requirement
obsolete.
For all internal serializers, especially SQL and going through your builder,
the interface is already implemented.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]