shanthoosh commented on a change in pull request #951: SAMZA-2127: Upgrade to Kafka 2.0 URL: https://github.com/apache/samza/pull/951#discussion_r268384606
########## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java ########## @@ -628,10 +589,12 @@ public void validateStream(StreamSpec streamSpec) throws StreamValidationExcepti @Override public void deleteMessages(Map<SystemStreamPartition, String> offsets) { if (deleteCommittedMessages) { - if (adminClientForDelete == null) { - adminClientForDelete = kafka.admin.AdminClient.create(createAdminClientProperties()); - } - KafkaSystemAdminUtilsScala.deleteMessages(adminClientForDelete, offsets); + Map<TopicPartition, RecordsToDelete> recordsToDelete = offsets.entrySet() + .stream() + .collect(Collectors.toMap(entry -> + new TopicPartition(entry.getKey().getStream(), entry.getKey().getPartition().getPartitionId()), + entry -> RecordsToDelete.beforeOffset(Long.parseLong(entry.getValue()) + 1))); + adminClient.deleteRecords(recordsToDelete); Review comment: 1. Github does not allow me to comment on `createAdminClientProperties` API in this class(Line number: 629 in this file). Since we're moving to new [AdminClient](https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html), we can remove some configurations like zookeeper.servers which are not supported by new admin-client. Here's the list of valid [configurations](https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClientConfig.html) accepted by new AdminClient. 2. Minor: It would be better to rely upon static constants exposed by `AdminClientConfig` rather than `ConsumerConfig` to create admin-client. Ex: Switch from `ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG` to `AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG`. 3. There're some configurations like request.backoff.ms, retry.backoff.ms configurable for the AdminClient, would be better to find if we want to allow the user to override them(or we want to increase/decrease the defaults). ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services