This is an automated email from the ASF dual-hosted git repository. yhu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push: new 8454cc95030 Change assertion and timeout for kafka stress test (#31089) 8454cc95030 is described below commit 8454cc9503008e28f5d3e90d39659352d46af9c2 Author: akashorabek <70029317+akashora...@users.noreply.github.com> AuthorDate: Wed Apr 24 23:21:18 2024 +0500 Change assertion and timeout for kafka stress test (#31089) --- it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java index 4e7c0d428cb..3812c4ea9fc 100644 --- a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java +++ b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java @@ -155,7 +155,7 @@ public final class KafkaIOST extends IOStressTestBase { Configuration.class), "large", Configuration.fromJsonString( - "{\"rowsPerSecond\":50000,\"numRecords\":5000000,\"valueSizeBytes\":1000,\"minutes\":60,\"pipelineTimeout\":120,\"runner\":\"DataflowRunner\"}", + "{\"rowsPerSecond\":50000,\"numRecords\":5000000,\"valueSizeBytes\":1000,\"minutes\":60,\"pipelineTimeout\":240,\"runner\":\"DataflowRunner\"}", Configuration.class)); } catch (IOException e) { throw new RuntimeException(e); @@ -199,9 +199,9 @@ public final class KafkaIOST extends IOStressTestBase { readInfo.jobId(), getBeamMetricsName(PipelineMetricsType.COUNTER, READ_ELEMENT_METRIC_NAME)); - // Assert that writeNumRecords equals or greater than readNumRecords since there might be + // Assert that readNumRecords equals or greater than writeNumRecords since there might be // duplicates when testing big amount of data - assertTrue(writeNumRecords >= readNumRecords); + assertTrue(readNumRecords >= writeNumRecords); } finally { // clean up pipelines if (pipelineLauncher.getJobStatus(project, region, writeInfo.jobId())