zabetak commented on code in PR #6110:
URL: https://github.com/apache/hive/pull/6110#discussion_r2517482583


##########
itests/qtest-druid/pom.xml:
##########
@@ -226,6 +226,11 @@
       <artifactId>kafka-clients</artifactId>
       <version>${kafka.test.version}</version>
     </dependency>
+    <dependency>
+      <groupId>org.apache.kafka</groupId>
+      <artifactId>kafka-server</artifactId>
+      <version>${kafka.test.version}</version>
+    </dependency>

Review Comment:
   Why do we need this new dependency? I don't see this defined in the 
kafka-handler module and there I see that the `KafkaServer` is used.



##########
itests/qtest-druid/pom.xml:
##########
@@ -36,7 +36,7 @@
     <druid.derby.version>10.11.1.1</druid.derby.version>
     <druid.guava.version>16.0.1</druid.guava.version>
     <druid.guice.version>4.1.0</druid.guice.version>
-    <kafka.test.version>2.5.0</kafka.test.version>
+    <kafka.test.version>3.9.1</kafka.test.version>

Review Comment:
   We can drop this version definition here and keep only the one in the root 
pom.xml file.



##########
kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java:
##########
@@ -181,10 +188,15 @@ short getEpoch() {
    */
   private void flushNewPartitions() {
     LOG.info("Flushing new partitions");
-    TransactionalRequestResult result = enqueueNewPartitions();
-    Object sender = getValue(kafkaProducer, "sender");
-    invoke(sender, "wakeup");
-    result.await();
+    Object transactionManager = getValue(kafkaProducer, "transactionManager");
+    Set<TopicPartition> newPartitionsInTransaction =
+            (Set<TopicPartition>) getValue(transactionManager, 
"newPartitionsInTransaction");
+    if (!newPartitionsInTransaction.isEmpty()) {

Review Comment:
   This class was originated from Apache Flink and looking at the [latest 
version](https://github.com/apache/flink-connector-kafka/blob/95e38963a327ec522cc0cd28c083fb08f93638bd/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java#L219)
 I see that the handling of new partitions is inside `enqueueNewPartitions` and 
is slightly different. Possibly it would be easier to port more changes in the 
future if the two versions are more aligned.



##########
kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java:
##########
@@ -150,7 +151,7 @@ private void cleanRowBoat() {
     LOG.trace("total read bytes [{}]", readBytes);
     if (consumer != null) {
       consumer.wakeup();
-      consumer.close();
+      consumer.close(Duration.ZERO);

Review Comment:
   Why are we now passing a zero timeout? The default is not sufficient?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to