codope commented on code in PR #8445: URL: https://github.com/apache/hudi/pull/8445#discussion_r1202564227
########## hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java: ########## @@ -198,10 +200,10 @@ public void testWriteReadHFileWithMetaFields(boolean populateMetaFields, boolean } } + @Disabled("Disable the test with evolved schema for HFile since it's not supported") + @ParameterizedTest @Override - @Test - public void testWriteReadWithEvolvedSchema() throws Exception { - // Disable the test with evolved schema for HFile since it's not supported + public void testWriteReadWithEvolvedSchema(String evolvedSchemaPath) throws Exception { Review Comment: should we just remove it for now? there's already a tracking jira. ########## hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/index/hbase/TestSparkHoodieHBaseIndex.java: ########## @@ -114,6 +114,9 @@ public class TestSparkHoodieHBaseIndex extends SparkClientFunctionalTestHarness @BeforeAll public static void init() throws Exception { // Initialize HbaseMiniCluster + System.setProperty("zookeeper.preAllocSize", "100"); + System.setProperty("zookeeper.maxCnxns", "60"); + System.setProperty("zookeeper.4lw.commands.whitelist", "*"); Review Comment: why do we need these configs in this PR? ########## hudi-spark-datasource/hudi-spark3-common/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java: ########## @@ -70,7 +71,7 @@ private static Stream<Arguments> bulkInsertTypeParams() { @ParameterizedTest @MethodSource("bulkInsertTypeParams") public void testDataSourceWriter(boolean populateMetaFields) throws Exception { - testDataSourceWriterInternal(Collections.EMPTY_MAP, Collections.EMPTY_MAP, populateMetaFields); + testDataSourceWriterInternal(Collections.emptyMap(), Collections.emptyMap(), populateMetaFields); Review Comment: just asking, does it make any difference? I believe both are the same thing right? ########## hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java: ########## @@ -159,8 +159,8 @@ public void testAppendKafkaOffsetsSourceFormatAdapter() throws IOException { UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(), props, jsc()), props, jsc(), new ArrayList<>()); props.put("hoodie.deltastreamer.source.kafka.value.deserializer.class", ByteArrayDeserializer.class.getName()); - int numPartitions = 3; - int numMessages = 15; + int numPartitions = 2; + int numMessages = 30; Review Comment: same here, why change it? is it reduce the test time (lesser partitions lesser i/o)? ########## hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java: ########## @@ -308,30 +309,35 @@ public boolean upsertAndCommit(String baseTableInstantTime, Option commitedInsta @Test public void testAppendKafkaOffset() { final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend"; - int numPartitions = 3; - int numMessages = 15; + int numPartitions = 2; + int numMessages = 30; testUtils.createTopic(topic, numPartitions); sendMessagesToKafka(topic, numMessages, numPartitions); TypedProperties props = createPropsForKafkaSource(topic, null, "earliest"); Source jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); SourceFormatAdapter kafkaSource = new SourceFormatAdapter(jsonSource); - Dataset<Row> c = kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get(); - assertEquals(numMessages, c.count()); - List<String> columns = Arrays.stream(c.columns()).collect(Collectors.toList()); + Dataset<Row> dfNoOffsetInfo = kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get().cache(); + assertEquals(numMessages, dfNoOffsetInfo.count()); + List<String> columns = Arrays.stream(dfNoOffsetInfo.columns()).collect(Collectors.toList()); props.put(HoodieDeltaStreamerConfig.KAFKA_APPEND_OFFSETS.key(), "true"); jsonSource = new JsonKafkaSource(props, jsc(), spark(), schemaProvider, metrics); kafkaSource = new SourceFormatAdapter(jsonSource); - Dataset<Row> d = kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get(); - assertEquals(numMessages, d.count()); + Dataset<Row> dfWithOffsetInfo = kafkaSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE).getBatch().get().cache(); + assertEquals(numMessages, dfWithOffsetInfo.count()); for (int i = 0; i < numPartitions; i++) { - assertEquals(numMessages / numPartitions, d.filter("_hoodie_kafka_source_partition=" + i).collectAsList().size()); + assertEquals(numMessages / numPartitions, dfWithOffsetInfo.filter("_hoodie_kafka_source_partition=" + i).count()); } - assertEquals(0, d.drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN).except(c).count()); - List<String> withKafkaOffsetColumns = Arrays.stream(d.columns()).collect(Collectors.toList()); + assertEquals(0, dfWithOffsetInfo + .drop(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN) + .except(dfNoOffsetInfo).count()); + List<String> withKafkaOffsetColumns = Arrays.stream(dfWithOffsetInfo.columns()).collect(Collectors.toList()); assertEquals(3, withKafkaOffsetColumns.size() - columns.size()); List<String> appendList = Arrays.asList(KAFKA_SOURCE_OFFSET_COLUMN, KAFKA_SOURCE_PARTITION_COLUMN, KAFKA_SOURCE_TIMESTAMP_COLUMN); assertEquals(appendList, withKafkaOffsetColumns.subList(withKafkaOffsetColumns.size() - 3, withKafkaOffsetColumns.size())); + + dfNoOffsetInfo.unpersist(); + dfWithOffsetInfo.unpersist(); Review Comment: +1 ########## hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java: ########## @@ -1946,8 +1947,8 @@ public void testJsonKafkaDFSSource() throws Exception { @Test public void testJsonKafkaDFSSourceWithOffsets() throws Exception { topicName = "topic" + testNum; - int numRecords = 15; - int numPartitions = 3; + int numRecords = 30; + int numPartitions = 2; Review Comment: why change these values? ########## hudi-common/src/test/java/org/apache/hudi/common/testutils/minicluster/ZookeeperTestService.java: ########## @@ -163,6 +163,8 @@ private static void setupTestEnv() { // resulting in test failure (client timeout on first session). // set env and directly in order to handle static init/gc issues System.setProperty("zookeeper.preAllocSize", "100"); + System.setProperty("zookeeper.maxCnxns", "60"); + System.setProperty("zookeeper.4lw.commands.whitelist", "*"); Review Comment: same question as above. if it's a test issue then maybe add it in a separate PR? ########## hudi-integ-test/pom.xml: ########## @@ -100,6 +98,21 @@ <scope>test</scope> </dependency> + <!-- Parquet --> + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-avro</artifactId> + <version>${parquet.version}</version> + <scope>test</scope> + </dependency> + + <dependency> + <groupId>org.apache.parquet</groupId> + <artifactId>parquet-hadoop</artifactId> + <version>${parquet.version}</version> + <scope>test</scope> + </dependency> Review Comment: why are these dependencies needed here? also, if we run integ tests with hudi-spark3.x-bundle, wouldn't they already be present in classpath? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: commits-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org