shanthoosh commented on a change in pull request #951: SAMZA-2127: Upgrade to 
Kafka 2.0
URL: https://github.com/apache/samza/pull/951#discussion_r268384606
 
 

 ##########
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
 ##########
 @@ -628,10 +589,12 @@ public void validateStream(StreamSpec streamSpec) throws 
StreamValidationExcepti
   @Override
   public void deleteMessages(Map<SystemStreamPartition, String> offsets) {
     if (deleteCommittedMessages) {
-      if (adminClientForDelete == null) {
-        adminClientForDelete = 
kafka.admin.AdminClient.create(createAdminClientProperties());
-      }
-      KafkaSystemAdminUtilsScala.deleteMessages(adminClientForDelete, offsets);
+      Map<TopicPartition, RecordsToDelete> recordsToDelete = offsets.entrySet()
+          .stream()
+          .collect(Collectors.toMap(entry ->
+              new TopicPartition(entry.getKey().getStream(), 
entry.getKey().getPartition().getPartitionId()),
+              entry -> 
RecordsToDelete.beforeOffset(Long.parseLong(entry.getValue()) + 1)));
+      adminClient.deleteRecords(recordsToDelete);
 
 Review comment:
   1. Github does not allow me to comment on `createAdminClientProperties` API 
in this class(Line number: 629 in this file).  Since we're moving to new 
[AdminClient](https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClient.html),
 we can remove some configurations like zookeeper.servers which are not 
supported by new admin-client.  Here's the list of valid 
[configurations](https://kafka.apache.org/20/javadoc/index.html?org/apache/kafka/clients/admin/AdminClientConfig.html)
 accepted by new AdminClient.
   2. Minor: It would be better to rely upon static constants exposed by 
`AdminClientConfig` rather than `ConsumerConfig` to create admin-client. Ex: 
Switch from `ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG` to 
`AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG`.
   3. There're some configurations like request.backoff.ms, retry.backoff.ms 
configurable for the AdminClient, would be better to find if we want to allow 
the user to override them(or we want to increase/decrease the defaults).

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to