zabetak commented on code in PR #6110:
URL: https://github.com/apache/hive/pull/6110#discussion_r2517482583
##########
itests/qtest-druid/pom.xml:
##########
@@ -226,6 +226,11 @@
<artifactId>kafka-clients</artifactId>
<version>${kafka.test.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-server</artifactId>
+ <version>${kafka.test.version}</version>
+ </dependency>
Review Comment:
Why do we need this new dependency? I don't see this defined in the
kafka-handler module and there I see that the `KafkaServer` is used.
##########
itests/qtest-druid/pom.xml:
##########
@@ -36,7 +36,7 @@
<druid.derby.version>10.11.1.1</druid.derby.version>
<druid.guava.version>16.0.1</druid.guava.version>
<druid.guice.version>4.1.0</druid.guice.version>
- <kafka.test.version>2.5.0</kafka.test.version>
+ <kafka.test.version>3.9.1</kafka.test.version>
Review Comment:
We can drop this version definition here and keep only the one in the root
pom.xml file.
##########
kafka-handler/src/java/org/apache/hadoop/hive/kafka/HiveKafkaProducer.java:
##########
@@ -181,10 +188,15 @@ short getEpoch() {
*/
private void flushNewPartitions() {
LOG.info("Flushing new partitions");
- TransactionalRequestResult result = enqueueNewPartitions();
- Object sender = getValue(kafkaProducer, "sender");
- invoke(sender, "wakeup");
- result.await();
+ Object transactionManager = getValue(kafkaProducer, "transactionManager");
+ Set<TopicPartition> newPartitionsInTransaction =
+ (Set<TopicPartition>) getValue(transactionManager,
"newPartitionsInTransaction");
+ if (!newPartitionsInTransaction.isEmpty()) {
Review Comment:
This class was originated from Apache Flink and looking at the [latest
version](https://github.com/apache/flink-connector-kafka/blob/95e38963a327ec522cc0cd28c083fb08f93638bd/flink-connector-kafka/src/main/java/org/apache/flink/connector/kafka/sink/internal/FlinkKafkaInternalProducer.java#L219)
I see that the handling of new partitions is inside `enqueueNewPartitions` and
is slightly different. Possibly it would be easier to port more changes in the
future if the two versions are more aligned.
##########
kafka-handler/src/java/org/apache/hadoop/hive/kafka/VectorizedKafkaRecordReader.java:
##########
@@ -150,7 +151,7 @@ private void cleanRowBoat() {
LOG.trace("total read bytes [{}]", readBytes);
if (consumer != null) {
consumer.wakeup();
- consumer.close();
+ consumer.close(Duration.ZERO);
Review Comment:
Why are we now passing a zero timeout? The default is not sufficient?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]