codope commented on code in PR #8445:
URL: https://github.com/apache/hudi/pull/8445#discussion_r1202564227


##########
hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java:
##########
@@ -198,10 +200,10 @@ public void testWriteReadHFileWithMetaFields(boolean 
populateMetaFields, boolean
     }
   }
 
+  @Disabled("Disable the test with evolved schema for HFile since it's not 
supported")
+  @ParameterizedTest
   @Override
-  @Test
-  public void testWriteReadWithEvolvedSchema() throws Exception {
-    // Disable the test with evolved schema for HFile since it's not supported
+  public void testWriteReadWithEvolvedSchema(String evolvedSchemaPath) throws 
Exception {

Review Comment:
   should we just remove it for now? there's already a tracking jira.



##########
hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java:
##########
@@ -114,6 +114,9 @@ public class TestSparkHoodieHBaseIndex extends 
SparkClientFunctionalTestHarness
   @BeforeAll
   public static void init() throws Exception {
     // Initialize HbaseMiniCluster
+    System.setProperty("zookeeper.preAllocSize", "100");
+    System.setProperty("zookeeper.maxCnxns", "60");
+    System.setProperty("zookeeper.4lw.commands.whitelist", "*");

Review Comment:
   why do we need these configs in this PR?



##########
hudi-spark-datasource/hudi-spark3-common/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java:
##########
@@ -70,7 +71,7 @@ private static Stream<Arguments> bulkInsertTypeParams() {
   @ParameterizedTest
   @MethodSource("bulkInsertTypeParams")
   public void testDataSourceWriter(boolean populateMetaFields) throws 
Exception {
-    testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP, 
populateMetaFields);
+    testDataSourceWriterInternal(Collections.emptyMap(), 
Collections.emptyMap(), populateMetaFields);

Review Comment:
   just asking, does it make any difference? I believe both are the same thing 
right?



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java:
##########
@@ -159,8 +159,8 @@ public void testAppendKafkaOffsetsSourceFormatAdapter() 
throws IOException {
         
UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), 
props, jsc()), props, jsc(), new ArrayList<>());
 
     props.put("hoodie.deltastreamer.source.kafka.value.deserializer.class", 
ByteArrayDeserializer.class.getName());
-    int numPartitions = 3;
-    int numMessages = 15;
+    int numPartitions = 2;
+    int numMessages = 30;

Review Comment:
   same here, why change it? is it reduce the test time (lesser partitions 
lesser i/o)?



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java:
##########
@@ -308,30 +309,35 @@ public boolean upsertAndCommit(String 
baseTableInstantTime, Option commitedInsta
   @Test
   public void testAppendKafkaOffset() {
     final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend";
-    int numPartitions = 3;
-    int numMessages = 15;
+    int numPartitions = 2;
+    int numMessages = 30;
     testUtils.createTopic(topic, numPartitions);
     sendMessagesToKafka(topic, numMessages, numPartitions);
 
     TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
     Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), 
schemaProvider, metrics);
     SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource);
-    Dataset<Row> c = kafkaSource.fetchNewDataInRowFormat(Option.empty(), 
Long.MAX_VALUE).getBatch().get();
-    assertEquals(numMessages, c.count());
-    List<String> columns = 
Arrays.stream(c.columns()).collect(Collectors.toList());
+    Dataset<Row> dfNoOffsetInfo = 
kafkaSource.fetchNewDataInRowFormat(Option.empty(), 
Long.MAX_VALUE).getBatch().get().cache();
+    assertEquals(numMessages, dfNoOffsetInfo.count());
+    List<String> columns = 
Arrays.stream(dfNoOffsetInfo.columns()).collect(Collectors.toList());
 
     props.put(HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS.key(), "true");
     jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, 
metrics);
     kafkaSource = new SourceFormatAdapter(jsonSource);
-    Dataset<Row> d = kafkaSource.fetchNewDataInRowFormat(Option.empty(), 
Long.MAX_VALUE).getBatch().get();
-    assertEquals(numMessages, d.count());
+    Dataset<Row> dfWithOffsetInfo = 
kafkaSource.fetchNewDataInRowFormat(Option.empty(), 
Long.MAX_VALUE).getBatch().get().cache();
+    assertEquals(numMessages, dfWithOffsetInfo.count());
     for (int i = 0; i < numPartitions; i++) {
-      assertEquals(numMessages / numPartitions, 
d.filter("_hoodie_kafka_source_partition=" + i).collectAsList().size());
+      assertEquals(numMessages / numPartitions, 
dfWithOffsetInfo.filter("_hoodie_kafka_source_partition=" + i).count());
     }
-    assertEquals(0, d.drop(KAFKA_SOURCE_OFFSET_COLUMN, 
KAFKA_SOURCE_PARTITION_COLUMN, 
KAFKA_SOURCE_TIMESTAMP_COLUMN).except(c).count());
-    List<String> withKafkaOffsetColumns = 
Arrays.stream(d.columns()).collect(Collectors.toList());
+    assertEquals(0, dfWithOffsetInfo
+        .drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, 
KAFKA_SOURCE_TIMESTAMP_COLUMN)
+        .except(dfNoOffsetInfo).count());
+    List<String> withKafkaOffsetColumns = 
Arrays.stream(dfWithOffsetInfo.columns()).collect(Collectors.toList());
     assertEquals(3, withKafkaOffsetColumns.size() - columns.size());
     List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, 
KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN);
     assertEquals(appendList, 
withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 3, 
withKafkaOffsetColumns.size()));
+
+    dfNoOffsetInfo.unpersist();
+    dfWithOffsetInfo.unpersist();

Review Comment:
   +1



##########
hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java:
##########
@@ -1946,8 +1947,8 @@ public void testJsonKafkaDFSSource() throws Exception {
   @Test
   public void testJsonKafkaDFSSourceWithOffsets() throws Exception {
     topicName = "topic" + testNum;
-    int numRecords = 15;
-    int numPartitions = 3;
+    int numRecords = 30;
+    int numPartitions = 2;

Review Comment:
   why change these values?



##########
hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/ZookeeperTestService.java:
##########
@@ -163,6 +163,8 @@ private static void setupTestEnv() {
     // resulting in test failure (client timeout on first session).
     // set env and directly in order to handle static init/gc issues
     System.setProperty("zookeeper.preAllocSize", "100");
+    System.setProperty("zookeeper.maxCnxns", "60");
+    System.setProperty("zookeeper.4lw.commands.whitelist", "*");

Review Comment:
   same question as above. if it's a test issue then maybe add it in a separate 
PR?



##########
hudi-integ-test/pom.xml:
##########
@@ -100,6 +98,21 @@
       <scope>test</scope>
     </dependency>
 
+    <!-- Parquet -->
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-avro</artifactId>
+      <version>${parquet.version}</version>
+      <scope>test</scope>
+    </dependency>
+
+    <dependency>
+      <groupId>org.apache.parquet</groupId>
+      <artifactId>parquet-hadoop</artifactId>
+      <version>${parquet.version}</version>
+      <scope>test</scope>
+    </dependency>

Review Comment:
   why are these dependencies needed here? also, if we run integ tests with 
hudi-spark3.x-bundle, wouldn't they already be present in classpath?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to