This is an automated email from the ASF dual-hosted git repository.

yhu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git


The following commit(s) were added to refs/heads/master by this push:
     new 1a5dc1c7fab Replace Source type for BigTable and Kafka stress tests 
(#30999)
1a5dc1c7fab is described below

commit 1a5dc1c7fab798a2ee8f526935432397648028f5
Author: akashorabek <70029317+akashora...@users.noreply.github.com>
AuthorDate: Mon Apr 22 20:53:55 2024 +0500

    Replace Source type for BigTable and Kafka stress tests (#30999)
    
    * replace Periodic to Unbounded for BigTableIOST
    
    * replace PeriodicImpulse for Kafka stress test
    
    * refactor
    
    * correct a number of records and records per second
    
    * specify correct resources for kafka servers
    
    * refactor
---
 .../base/v0.33.2/kafka-persistent.yaml             | 14 ++++++--
 .../gke-internal-load-balanced/kustomization.yaml  |  8 +++--
 .../it/gcp/bigtable/BigtableResourceManager.java   |  2 +-
 .../apache/beam/it/gcp/bigtable/BigTableIOST.java  | 41 +++++++++-------------
 .../java/org/apache/beam/it/kafka/KafkaIOST.java   | 30 ++++++----------
 5 files changed, 45 insertions(+), 50 deletions(-)

diff --git 
a/.test-infra/kafka/strimzi/02-kafka-persistent/base/v0.33.2/kafka-persistent.yaml
 
b/.test-infra/kafka/strimzi/02-kafka-persistent/base/v0.33.2/kafka-persistent.yaml
index 7ccc1bf7aa3..294c9a2e515 100644
--- 
a/.test-infra/kafka/strimzi/02-kafka-persistent/base/v0.33.2/kafka-persistent.yaml
+++ 
b/.test-infra/kafka/strimzi/02-kafka-persistent/base/v0.33.2/kafka-persistent.yaml
@@ -26,7 +26,11 @@ metadata:
   name: beam-testing-cluster
 spec:
   kafka:
-    version: 3.4.0
+    resources:
+      requests:
+        cpu: 8
+        memory: 64Gi
+    version: 3.6.0
     replicas: 3
     config:
       offsets.topic.replication.factor: 3
@@ -40,9 +44,13 @@ spec:
       volumes:
         - id: 0
           type: persistent-claim
-          size: 100Gi
+          size: 500Gi
           deleteClaim: false
   zookeeper:
+    resources:
+      requests:
+        cpu: 1
+        memory: 2Gi
     replicas: 3
     storage:
       type: persistent-claim
@@ -50,4 +58,4 @@ spec:
       deleteClaim: false
   entityOperator:
     topicOperator: {}
-    userOperator: {}
+    userOperator: {}
\ No newline at end of file
diff --git 
a/.test-infra/kafka/strimzi/02-kafka-persistent/overlays/gke-internal-load-balanced/kustomization.yaml
 
b/.test-infra/kafka/strimzi/02-kafka-persistent/overlays/gke-internal-load-balanced/kustomization.yaml
index 84df24b26a8..556bdf32b27 100644
--- 
a/.test-infra/kafka/strimzi/02-kafka-persistent/overlays/gke-internal-load-balanced/kustomization.yaml
+++ 
b/.test-infra/kafka/strimzi/02-kafka-persistent/overlays/gke-internal-load-balanced/kustomization.yaml
@@ -18,9 +18,11 @@
 # This overlay patches the base strimzi kafka cluster with a GKE TCP
 # internal load balancer ingress
 
-namespace: strimzi
 
 resources:
 - ../../base/v0.33.2
-patchesStrategicMerge:
-- listeners.yaml
\ No newline at end of file
+apiVersion: kustomize.config.k8s.io/v1beta1
+kind: Kustomization
+
+patches:
+  - path: listeners.yaml
\ No newline at end of file
diff --git 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
index 311ce9575c2..772f81062a8 100644
--- 
a/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
+++ 
b/it/google-cloud-platform/src/main/java/org/apache/beam/it/gcp/bigtable/BigtableResourceManager.java
@@ -81,7 +81,7 @@ public class BigtableResourceManager implements 
ResourceManager {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(BigtableResourceManager.class);
   private static final String DEFAULT_CLUSTER_ZONE = "us-central1-b";
-  private static final int DEFAULT_CLUSTER_NUM_NODES = 1;
+  private static final int DEFAULT_CLUSTER_NUM_NODES = 10;
   private static final StorageType DEFAULT_CLUSTER_STORAGE_TYPE = 
StorageType.SSD;
 
   private final String projectId;
diff --git 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java
 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java
index 4abcee8e6d5..f8cf2511aae 100644
--- 
a/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java
+++ 
b/it/google-cloud-platform/src/test/java/org/apache/beam/it/gcp/bigtable/BigTableIOST.java
@@ -26,6 +26,8 @@ import com.google.bigtable.v2.Mutation;
 import com.google.protobuf.ByteString;
 import java.io.IOException;
 import java.io.Serializable;
+import java.nio.ByteBuffer;
+import java.text.ParseException;
 import java.time.Duration;
 import java.util.List;
 import java.util.Map;
@@ -38,15 +40,16 @@ import org.apache.beam.it.common.TestProperties;
 import org.apache.beam.it.common.utils.ResourceManagerUtils;
 import org.apache.beam.it.gcp.IOStressTestBase;
 import 
org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.gcp.bigtable.BigtableIO;
 import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.DoFn;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.PeriodicImpulse;
 import org.apache.beam.sdk.values.KV;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
@@ -144,11 +147,11 @@ public final class BigTableIOST extends IOStressTestBase {
           ImmutableMap.of(
               "medium",
               Configuration.fromJsonString(
-                  
"{\"rowsPerSecond\":25000,\"minutes\":40,\"pipelineTimeout\":120,\"valueSizeBytes\":100,\"runner\":\"DataflowRunner\"}",
+                  
"{\"rowsPerSecond\":10000,\"minutes\":40,\"pipelineTimeout\":80,\"numRecords\":1000000,\"valueSizeBytes\":100,\"runner\":\"DataflowRunner\"}",
                   Configuration.class),
               "large",
               Configuration.fromJsonString(
-                  
"{\"rowsPerSecond\":25000,\"minutes\":130,\"pipelineTimeout\":200,\"valueSizeBytes\":1000,\"runner\":\"DataflowRunner\"}",
+                  
"{\"rowsPerSecond\":50000,\"minutes\":60,\"pipelineTimeout\":120,\"numRecords\":5000000,\"valueSizeBytes\":1000,\"runner\":\"DataflowRunner\"}",
                   Configuration.class));
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -157,7 +160,7 @@ public final class BigTableIOST extends IOStressTestBase {
 
   /** Run stress test with configurations specified by TestProperties. */
   @Test
-  public void runTest() throws IOException {
+  public void runTest() throws IOException, ParseException, 
InterruptedException {
     if (configuration.exportMetricsToInfluxDB) {
       influxDBSettings =
           InfluxDBSettings.builder()
@@ -194,9 +197,9 @@ public final class BigTableIOST extends IOStressTestBase {
               readInfo.jobId(),
               getBeamMetricsName(PipelineMetricsType.COUNTER, 
READ_ELEMENT_METRIC_NAME));
 
-      // Assert that writeNumRecords equals or greater than readNumRecords 
since there might be
+      // Assert that readNumRecords equals or greater than writeNumRecords 
since there might be
       // duplicates when testing big amount of data
-      assertTrue(writeNumRecords >= readNumRecords);
+      assertTrue(readNumRecords >= writeNumRecords);
     } finally {
       // clean up write streaming pipeline
       if (pipelineLauncher.getJobStatus(project, region, writeInfo.jobId())
@@ -230,22 +233,14 @@ public final class BigTableIOST extends IOStressTestBase {
    * dynamically over time, with options to use configurable parameters.
    */
   private PipelineLauncher.LaunchInfo generateDataAndWrite() throws 
IOException {
-    // The PeriodicImpulse source will generate an element every this many 
millis:
-    int fireInterval = 1;
-    // Each element from PeriodicImpulse will fan out to this many elements:
     int startMultiplier =
         Math.max(configuration.rowsPerSecond, DEFAULT_ROWS_PER_SECOND) / 
DEFAULT_ROWS_PER_SECOND;
-    long stopAfterMillis =
-        
org.joda.time.Duration.standardMinutes(configuration.minutes).getMillis();
-    long totalRows = startMultiplier * stopAfterMillis / fireInterval;
     List<LoadPeriod> loadPeriods =
         getLoadPeriods(configuration.minutes, DEFAULT_LOAD_INCREASE_ARRAY);
 
-    PCollection<org.joda.time.Instant> source =
-        writePipeline.apply(
-            PeriodicImpulse.create()
-                .stopAfter(org.joda.time.Duration.millis(stopAfterMillis - 1))
-                .withInterval(org.joda.time.Duration.millis(fireInterval)));
+    PCollection<KV<byte[], byte[]>> source =
+        writePipeline.apply(Read.from(new 
SyntheticUnboundedSource(configuration)));
+
     if (startMultiplier > 1) {
       source =
           source
@@ -257,7 +252,7 @@ public final class BigTableIOST extends IOStressTestBase {
     source
         .apply(
             "Map records to BigTable format",
-            ParDo.of(new MapToBigTableFormat((int) 
configuration.valueSizeBytes, (int) totalRows)))
+            ParDo.of(new MapToBigTableFormat((int) 
configuration.valueSizeBytes)))
         .apply(
             "Write to BigTable",
             BigtableIO.write()
@@ -353,20 +348,18 @@ public final class BigTableIOST extends IOStressTestBase {
 
   /** Maps Instant to the BigTable format record. */
   private static class MapToBigTableFormat
-      extends DoFn<org.joda.time.Instant, KV<ByteString, Iterable<Mutation>>>
-      implements Serializable {
+      extends DoFn<KV<byte[], byte[]>, KV<ByteString, Iterable<Mutation>>> 
implements Serializable {
 
     private final int valueSizeBytes;
-    private final int totalRows;
 
-    public MapToBigTableFormat(int valueSizeBytes, int totalRows) {
+    public MapToBigTableFormat(int valueSizeBytes) {
       this.valueSizeBytes = valueSizeBytes;
-      this.totalRows = totalRows;
     }
 
     @ProcessElement
     public void processElement(ProcessContext c) {
-      long index = Objects.requireNonNull(c.element()).getMillis() % totalRows;
+      ByteBuffer byteBuffer = 
ByteBuffer.wrap(Objects.requireNonNull(c.element()).getValue());
+      int index = byteBuffer.getInt();
 
       ByteString key =
           ByteString.copyFromUtf8(
diff --git a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java 
b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java
index 505b51cec04..4e7c0d428cb 100644
--- a/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java
+++ b/it/kafka/src/test/java/org/apache/beam/it/kafka/KafkaIOST.java
@@ -22,33 +22,34 @@ import static org.junit.Assert.assertTrue;
 
 import com.fasterxml.jackson.annotation.JsonProperty;
 import java.io.IOException;
+import java.text.ParseException;
 import java.time.Duration;
 import java.time.ZoneId;
 import java.time.format.DateTimeFormatter;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.Objects;
 import java.util.UUID;
 import org.apache.beam.it.common.PipelineLauncher;
 import org.apache.beam.it.common.PipelineOperator;
 import org.apache.beam.it.common.TestProperties;
 import org.apache.beam.it.gcp.IOStressTestBase;
 import 
org.apache.beam.runners.dataflow.options.DataflowPipelineWorkerPoolOptions;
+import org.apache.beam.sdk.io.Read;
 import org.apache.beam.sdk.io.kafka.KafkaIO;
 import org.apache.beam.sdk.io.synthetic.SyntheticSourceOptions;
+import org.apache.beam.sdk.io.synthetic.SyntheticUnboundedSource;
 import org.apache.beam.sdk.options.StreamingOptions;
 import org.apache.beam.sdk.testing.TestPipeline;
 import org.apache.beam.sdk.testing.TestPipelineOptions;
 import org.apache.beam.sdk.testutils.publishing.InfluxDBSettings;
 import org.apache.beam.sdk.transforms.MapElements;
 import org.apache.beam.sdk.transforms.ParDo;
-import org.apache.beam.sdk.transforms.PeriodicImpulse;
-import org.apache.beam.sdk.transforms.Reshuffle;
 import org.apache.beam.sdk.values.PCollection;
 import org.apache.beam.sdk.values.TypeDescriptor;
 import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Strings;
 import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
-import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.primitives.Longs;
 import org.apache.kafka.clients.admin.AdminClient;
 import org.apache.kafka.clients.admin.NewTopic;
 import org.apache.kafka.clients.producer.ProducerConfig;
@@ -150,11 +151,11 @@ public final class KafkaIOST extends IOStressTestBase {
           ImmutableMap.of(
               "medium",
               Configuration.fromJsonString(
-                  
"{\"rowsPerSecond\":25000,\"minutes\":30,\"pipelineTimeout\":60,\"runner\":\"DataflowRunner\"}",
+                  
"{\"rowsPerSecond\":10000,\"numRecords\":1000000,\"valueSizeBytes\":1000,\"minutes\":20,\"pipelineTimeout\":60,\"runner\":\"DataflowRunner\"}",
                   Configuration.class),
               "large",
               Configuration.fromJsonString(
-                  
"{\"rowsPerSecond\":25000,\"minutes\":130,\"pipelineTimeout\":300,\"runner\":\"DataflowRunner\"}",
+                  
"{\"rowsPerSecond\":50000,\"numRecords\":5000000,\"valueSizeBytes\":1000,\"minutes\":60,\"pipelineTimeout\":120,\"runner\":\"DataflowRunner\"}",
                   Configuration.class));
     } catch (IOException e) {
       throw new RuntimeException(e);
@@ -163,7 +164,7 @@ public final class KafkaIOST extends IOStressTestBase {
 
   /** Run stress test with configurations specified by TestProperties. */
   @Test
-  public void testWriteAndRead() throws IOException {
+  public void testWriteAndRead() throws IOException, ParseException, 
InterruptedException {
     if (configuration.exportMetricsToInfluxDB) {
       influxDBSettings =
           InfluxDBSettings.builder()
@@ -240,34 +241,25 @@ public final class KafkaIOST extends IOStressTestBase {
    * dynamically over time, with options to use configurable parameters.
    */
   private PipelineLauncher.LaunchInfo generateDataAndWrite() throws 
IOException {
-    // The PeriodicImpulse source will generate an element every this many 
millis:
-    int fireInterval = 1;
-    // Each element from PeriodicImpulse will fan out to this many elements:
     int startMultiplier =
         Math.max(configuration.rowsPerSecond, DEFAULT_ROWS_PER_SECOND) / 
DEFAULT_ROWS_PER_SECOND;
-    long stopAfterMillis =
-        
org.joda.time.Duration.standardMinutes(configuration.minutes).getMillis();
-    long totalRows = startMultiplier * stopAfterMillis / fireInterval;
     List<LoadPeriod> loadPeriods =
         getLoadPeriods(configuration.minutes, DEFAULT_LOAD_INCREASE_ARRAY);
 
     PCollection<byte[]> source =
         writePipeline
-            .apply(
-                PeriodicImpulse.create()
-                    .stopAfter(org.joda.time.Duration.millis(stopAfterMillis - 
1))
-                    .withInterval(org.joda.time.Duration.millis(fireInterval)))
+            .apply(Read.from(new SyntheticUnboundedSource(configuration)))
             .apply(
                 "Extract values",
                 MapElements.into(TypeDescriptor.of(byte[].class))
-                    .via(instant -> Longs.toByteArray(instant.getMillis() % 
totalRows)));
+                    .via(kv -> Objects.requireNonNull(kv).getValue()));
+
     if (startMultiplier > 1) {
       source =
           source
               .apply(
                   "One input to multiple outputs",
                   ParDo.of(new MultiplierDoFn<>(startMultiplier, loadPeriods)))
-              .apply("Reshuffle fanout", Reshuffle.viaRandomKey())
               .apply("Counting element", ParDo.of(new 
CountingFn<>(WRITE_ELEMENT_METRIC_NAME)));
     }
     source.apply(
@@ -336,7 +328,7 @@ public final class KafkaIOST extends IOStressTestBase {
      * Determines whether to use Dataflow runner v2. If set to true, it uses 
SDF mode for reading
      * from Kafka. Otherwise, Unbounded mode will be used.
      */
-    @JsonProperty public boolean useDataflowRunnerV2 = true;
+    @JsonProperty public boolean useDataflowRunnerV2 = false;
 
     /** Number of workers for the pipeline. */
     @JsonProperty public int numWorkers = 20;

Reply via email to