ahmedabu98 commented on code in PR #35572:
URL: https://github.com/apache/beam/pull/35572#discussion_r2205741637


##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datagen/DataGeneratorRowFn.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datagen;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.commons.lang.RandomStringUtils;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** A stateful DoFn that converts a sequence of Longs into structured Rows. */
+public class DataGeneratorRowFn extends DoFn<Long, Row> {
+  private final Schema schema;
+  private final ObjectNode properties;
+  private final @Nullable String primaryTimestampField;
+
+  private transient Map<String, FieldGenerator> fieldGenerators;
+  private transient Random random;
+
+  @SuppressWarnings("initialization")
+  public DataGeneratorRowFn(
+      Schema schema, ObjectNode properties, @Nullable String 
primaryTimestampField) {
+    this.schema = schema;
+    this.properties = properties;
+    this.primaryTimestampField = primaryTimestampField;
+  }
+
+  @Setup
+  public void setup() {
+    this.random = new Random();
+    this.fieldGenerators = new HashMap<>();
+
+    for (Schema.Field field : schema.getFields()) {
+      fieldGenerators.put(field.getName(), createGeneratorForField(field));
+    }
+  }
+
+  @ProcessElement
+  public void processElement(
+      @Element Long index, @Timestamp Instant timestamp, OutputReceiver<Row> 
out) {
+    Row.Builder rowBuilder = Row.withSchema(schema);
+    for (Schema.Field field : schema.getFields()) {
+      Object value;
+      if (field.getName().equals(this.primaryTimestampField)) {
+        value = timestamp.toDateTime();
+      } else {
+        FieldGenerator generator = fieldGenerators.get(field.getName());
+        if (generator == null) {
+          throw new IllegalStateException("No generator found for field: " + 
field.getName());
+        }
+        value = generator.generate(index);
+      }
+      rowBuilder.addValue(value);
+    }
+    out.output(rowBuilder.build());
+  }
+
+  @FunctionalInterface
+  private interface FieldGenerator extends Serializable {
+    @Nullable
+    Object generate(long index);
+  }
+
+  private FieldGenerator createGeneratorForField(Schema.Field field) {
+    String fieldName = field.getName();
+    FieldGenerator valueGenerator = createValueGeneratorForField(field);
+    double nullRate = properties.path("fields." + fieldName + 
".null-rate").asDouble(0.0);
+
+    if (nullRate > 0) {
+      return (index) ->
+          Objects.requireNonNull(random).nextDouble() < nullRate

Review Comment:
   Consider using `ThreadLocalRandom.current()` instead of `new Random()`.



##########
website/www/site/content/en/documentation/dsls/sql/extensions/create-external-table.md:
##########
@@ -644,6 +644,151 @@ TYPE text
 LOCATION '/home/admin/orders'
 ```
 
+## DataGen
+
+The **DataGen** connector allows for creating tables based on in-memory data 
generation. This is useful for developing and testing queries locally without 
requiring access to external systems. The DataGen connector is built-in; no 
additional dependencies are required.
+
+Tables can be either **bounded** (generating a fixed number of rows) or 
**unbounded** (generating a stream of rows at a specific rate). The connector 
provides fine-grained controls to customize the generated values for each 
field, including support for event-time windowing.
+
+### Syntax
+
+```sql
+CREATE EXTERNAL TABLE [ IF NOT EXISTS ] tableName (tableElement [, 
tableElement ]*)
+TYPE datagen
+[TBLPROPERTIES tblProperties]
+```
+
+### Table Properties (`TBLPROPERTIES`)
+
+The `TBLPROPERTIES` JSON object is used to configure the generator's behavior.
+
+
+#### General Options
+
+| Key | Required | Description |
+| :--- | :--- | :--- |
+| `number-of-rows` | **Yes** (or `rows-per-second`) | Creates a **bounded** 
table with a specified total number of rows. |
+| `rows-per-second`| **Yes** (or `number-of-rows`) | Creates an **unbounded** 
table that generates rows at the specified rate. |
+
+#### Event-Time and Watermark Configuration
+
+| Key | Required | Description |
+| :--- | :--- | :--- |
+| `timestamp.behavior` | No | Specifies the time handling. Can be 
`'processing_time'` (default) or `'event_time'`. |
+| `event_time.timestamp_column` | **Yes**, if `timestamp.behavior` is 
`event_time` | The name of the column that will be used to drive the event-time 
watermark for the stream. |
+| `event_time.max_out_of_orderness` | No | When using `event_time`, this sets 
the maximum out-of-orderness in **milliseconds** for generated timestamps 
(e.g., `'5000'` for 5 seconds). Defaults to `0`. |

Review Comment:
   nit: can we use one consistent naming format for these new properties? Some 
are using kebab-case (hyphen) format and other use snake-case (underscore) 
format. 



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datagen/DataGeneratorPTransform.java:
##########
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datagen;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.io.GenerateSequence;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PBegin;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+
+/** The main PTransform that encapsulates the data generation logic. */
+public class DataGeneratorPTransform extends PTransform<PBegin, 
PCollection<Row>> {
+  private final Schema schema;
+  private final ObjectNode properties;
+
+  public DataGeneratorPTransform(Schema schema, ObjectNode properties) {
+    this.schema = schema;
+    this.properties = properties;
+  }
+
+  @Override
+  public PCollection<Row> expand(PBegin input) {
+    GenerateSequence generator;
+    JsonNode rpsNode = properties.path("rows-per-second");
+    JsonNode numRowsNode = properties.path("number-of-rows");
+
+    if (!rpsNode.isMissingNode()) {
+      generator = GenerateSequence.from(0).withRate(rpsNode.asLong(), 
Duration.standardSeconds(1));
+    } else if (!numRowsNode.isMissingNode()) {
+      generator = GenerateSequence.from(0).to(numRowsNode.asLong());
+    } else {
+      throw new IllegalArgumentException(
+          "A 'datagen' table requires either 'rows-per-second' (for unbounded) 
or "
+              + "'number-of-rows' (for bounded) in TBLPROPERTIES.");
+    }
+
+    String behavior = 
properties.path("timestamp.behavior").asText("processing_time");
+    @Nullable String eventTimeColumn = null;
+
+    if ("event_time".equalsIgnoreCase(behavior)) {
+      JsonNode columnNode = properties.path("event_time.timestamp_column");
+
+      if (columnNode.isMissingNode() || columnNode.isNull()) {
+        throw new IllegalArgumentException(
+            "For 'event_time' behavior, 'event_time.timestamp_column' must be 
specified.");
+      }
+      eventTimeColumn = columnNode.asText();

Review Comment:
   Can we add early validation that the Schema field corresponding to 
`eventTimeColumn` is the correct type?



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datagen/DataGeneratorRowFn.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datagen;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.commons.lang.RandomStringUtils;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** A stateful DoFn that converts a sequence of Longs into structured Rows. */
+public class DataGeneratorRowFn extends DoFn<Long, Row> {
+  private final Schema schema;
+  private final ObjectNode properties;
+  private final @Nullable String primaryTimestampField;
+
+  private transient Map<String, FieldGenerator> fieldGenerators;
+  private transient Random random;
+
+  @SuppressWarnings("initialization")
+  public DataGeneratorRowFn(
+      Schema schema, ObjectNode properties, @Nullable String 
primaryTimestampField) {
+    this.schema = schema;
+    this.properties = properties;
+    this.primaryTimestampField = primaryTimestampField;
+  }
+
+  @Setup
+  public void setup() {
+    this.random = new Random();
+    this.fieldGenerators = new HashMap<>();
+
+    for (Schema.Field field : schema.getFields()) {
+      fieldGenerators.put(field.getName(), createGeneratorForField(field));
+    }
+  }
+
+  @ProcessElement
+  public void processElement(
+      @Element Long index, @Timestamp Instant timestamp, OutputReceiver<Row> 
out) {
+    Row.Builder rowBuilder = Row.withSchema(schema);
+    for (Schema.Field field : schema.getFields()) {
+      Object value;
+      if (field.getName().equals(this.primaryTimestampField)) {
+        value = timestamp.toDateTime();
+      } else {
+        FieldGenerator generator = fieldGenerators.get(field.getName());
+        if (generator == null) {
+          throw new IllegalStateException("No generator found for field: " + 
field.getName());
+        }
+        value = generator.generate(index);
+      }
+      rowBuilder.addValue(value);
+    }
+    out.output(rowBuilder.build());
+  }
+
+  @FunctionalInterface
+  private interface FieldGenerator extends Serializable {
+    @Nullable
+    Object generate(long index);
+  }
+
+  private FieldGenerator createGeneratorForField(Schema.Field field) {
+    String fieldName = field.getName();
+    FieldGenerator valueGenerator = createValueGeneratorForField(field);
+    double nullRate = properties.path("fields." + fieldName + 
".null-rate").asDouble(0.0);
+
+    if (nullRate > 0) {
+      return (index) ->
+          Objects.requireNonNull(random).nextDouble() < nullRate

Review Comment:
   Same for other uses of `random` in the `createValueGeneratorForField()` 
method below



##########
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datagen/DataGeneratorTableProviderTest.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datagen;
+
+import java.math.BigDecimal;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/** Unit tests for the {@link DataGeneratorTableProvider}. */
+public class DataGeneratorTableProviderTest {
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testBoundedGeneration() {
+    String createDdl =
+        "CREATE EXTERNAL TABLE bounded_table (\n"
+            + "  id BIGINT,\n"
+            + "  name VARCHAR\n"
+            + ") TYPE 'datagen' TBLPROPERTIES '{\n"
+            + "  \"number-of-rows\": \"100\"\n"
+            + "}'";
+
+    PCollection<Row> result =
+        pipeline.apply(
+            "testBoundedGeneration",
+            SqlTransform.query("SELECT * FROM 
bounded_table").withDdlString(createDdl));
+
+    Assert.assertEquals(IsBounded.BOUNDED, result.isBounded());
+    PAssert.that(result.apply(Count.globally())).containsInAnyOrder(100L);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  private static class ValidateFieldsFn extends DoFn<Row, Void> {
+    private final long startId;
+    private final long endId;
+    private final int nameLength;
+    private final double minScore;
+    private final double maxScore;
+
+    ValidateFieldsFn(long startId, long endId, int nameLength, double 
minScore, double maxScore) {
+      this.startId = startId;
+      this.endId = endId;
+      this.nameLength = nameLength;
+      this.minScore = minScore;
+      this.maxScore = maxScore;
+    }
+
+    @ProcessElement
+    public void processElement(@Element Row row) {
+      Long eventId = row.getInt64("event_id");
+      Assert.assertTrue("Event ID should be within range", eventId >= startId 
&& eventId <= endId);
+
+      String eventName = row.getString("event_name");
+      Assert.assertEquals("Event name should have correct length", nameLength, 
eventName.length());
+
+      Double score = row.getDouble("score");
+      Assert.assertTrue("Score should be within range", score >= minScore && 
score <= maxScore);
+    }
+  }
+
+  @Test
+  public void testFieldGenerators() {
+    String createDdl =
+        "CREATE EXTERNAL TABLE complex_table (\n"
+            + "  event_id BIGINT,\n"
+            + "  event_name VARCHAR,\n"
+            + "  score DOUBLE\n"
+            + ") TYPE 'datagen' TBLPROPERTIES '{\n"
+            + "  \"number-of-rows\": \"10\",\n"
+            + "  \"fields.event_id.kind\": \"sequence\",\n"
+            + "  \"fields.event_id.start\": \"100\",\n"
+            + "  \"fields.event_id.end\": \"109\",\n"
+            + "  \"fields.event_name.kind\": \"random\",\n"
+            + "  \"fields.event_name.length\": \"15\",\n"
+            + "  \"fields.score.kind\": \"random\",\n"
+            + "  \"fields.score.min\": \"50.0\",\n"
+            + "  \"fields.score.max\": \"100.0\"\n"
+            + "}'";
+
+    PCollection<Row> result =
+        pipeline.apply(
+            "testFieldGenerators",
+            SqlTransform.query("SELECT * FROM 
complex_table").withDdlString(createDdl));
+
+    result.apply("ValidateFields", ParDo.of(new ValidateFieldsFn(100, 109, 15, 
50.0, 100.0)));
+    pipeline.run().waitUntilFinish();
+  }
+
+  private static class ValidateNullsFn extends DoFn<Row, Void> {
+    @ProcessElement
+    public void processElement(@Element Row row) {
+      Assert.assertNull("Field should be null", 
row.getValue("nullable_field"));
+      Assert.assertNotNull("Field should not be null", 
row.getInt64("non_nullable_field"));
+    }
+  }
+
+  @Test
+  public void testNullRate() {
+    String createDdl =
+        "CREATE EXTERNAL TABLE null_rate_table (\n"
+            + "  nullable_field VARCHAR,\n"
+            + "  non_nullable_field BIGINT\n"
+            + ") TYPE 'datagen' TBLPROPERTIES '{\n"
+            + "  \"number-of-rows\": \"50\",\n"
+            + "  \"fields.nullable_field.null-rate\": \"1.0\",\n"
+            + "  \"fields.non_nullable_field.kind\": \"sequence\"\n"
+            + "}'";
+
+    PCollection<Row> result =
+        pipeline.apply(
+            "testNullRate",
+            SqlTransform.query("SELECT * FROM 
null_rate_table").withDdlString(createDdl));
+
+    result.apply("ValidateNulls", ParDo.of(new ValidateNullsFn()));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  private static class ValidateAllTypesFn extends DoFn<Row, Void> {
+    @ProcessElement
+    public void processElement(@Element Row row) {
+      Assert.assertNotNull(row.getBoolean("is_active"));
+
+      BigDecimal cost = row.getDecimal("cost");
+      Assert.assertTrue("Cost should be >= 10.50", 
cost.compareTo(BigDecimal.valueOf(10.50)) >= 0);
+      Assert.assertTrue("Cost should be <= 99.99", 
cost.compareTo(BigDecimal.valueOf(99.99)) <= 0);
+
+      Instant now = Instant.now();
+      Instant pastTimestamp = row.getDateTime("past_timestamp").toInstant();
+      Instant nowTimestamp = row.getDateTime("now_timestamp").toInstant();
+
+      Assert.assertTrue("past_timestamp should be in the past", 
pastTimestamp.isBefore(now));
+      Assert.assertTrue(
+          "now_timestamp should be very recent",
+          now.plus(Duration.millis(100)).isAfter(nowTimestamp));

Review Comment:
   Wouldn't this always be true because `now` is always freshly created? Maybe 
something like this works:
   
   ```suggestion
             now.minus(Duration.millis(100)).isBefore(nowTimestamp));
   ```



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datagen/AdvancingTimestampFn.java:
##########
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datagen;
+
+import java.util.Random;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+class AdvancingTimestampFn implements SerializableFunction<Long, Instant> {
+  private static final Random RANDOM = new Random();
+  private final long maxOutOfOrdernessMs;
+  private final Instant baseTime = Instant.now();

Review Comment:
   But +1 to @Abacn's suggestion, PeriodicImpulse can create an "unbounded" 
source that gracefully stops/drains after some time.
   
   The current GenerateSequence only supports an ongoing streaming source (user 
would have to manually terminate the pipeline)



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datagen/DataGeneratorRowFn.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datagen;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.commons.lang.RandomStringUtils;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** A stateful DoFn that converts a sequence of Longs into structured Rows. */
+public class DataGeneratorRowFn extends DoFn<Long, Row> {
+  private final Schema schema;
+  private final ObjectNode properties;
+  private final @Nullable String primaryTimestampField;
+
+  private transient Map<String, FieldGenerator> fieldGenerators;
+  private transient Random random;
+
+  @SuppressWarnings("initialization")
+  public DataGeneratorRowFn(
+      Schema schema, ObjectNode properties, @Nullable String 
primaryTimestampField) {
+    this.schema = schema;
+    this.properties = properties;
+    this.primaryTimestampField = primaryTimestampField;
+  }
+
+  @Setup
+  public void setup() {
+    this.random = new Random();
+    this.fieldGenerators = new HashMap<>();
+
+    for (Schema.Field field : schema.getFields()) {
+      fieldGenerators.put(field.getName(), createGeneratorForField(field));
+    }
+  }
+
+  @ProcessElement
+  public void processElement(
+      @Element Long index, @Timestamp Instant timestamp, OutputReceiver<Row> 
out) {
+    Row.Builder rowBuilder = Row.withSchema(schema);
+    for (Schema.Field field : schema.getFields()) {
+      Object value;
+      if (field.getName().equals(this.primaryTimestampField)) {
+        value = timestamp.toDateTime();
+      } else {
+        FieldGenerator generator = fieldGenerators.get(field.getName());
+        if (generator == null) {
+          throw new IllegalStateException("No generator found for field: " + 
field.getName());
+        }
+        value = generator.generate(index);
+      }
+      rowBuilder.addValue(value);
+    }
+    out.output(rowBuilder.build());
+  }
+
+  @FunctionalInterface
+  private interface FieldGenerator extends Serializable {
+    @Nullable
+    Object generate(long index);
+  }
+
+  private FieldGenerator createGeneratorForField(Schema.Field field) {
+    String fieldName = field.getName();
+    FieldGenerator valueGenerator = createValueGeneratorForField(field);
+    double nullRate = properties.path("fields." + fieldName + 
".null-rate").asDouble(0.0);
+
+    if (nullRate > 0) {
+      return (index) ->
+          Objects.requireNonNull(random).nextDouble() < nullRate
+              ? null
+              : valueGenerator.generate(index);
+    }
+    return valueGenerator;
+  }
+
+  private FieldGenerator createValueGeneratorForField(Schema.Field field) {
+    String fieldName = field.getName();
+    String kind = properties.path("fields." + fieldName + 
".kind").asText("random");
+
+    if ("sequence".equalsIgnoreCase(kind)) {
+      JsonNode startNode = properties.path("fields." + fieldName + ".start");
+      JsonNode endNode = properties.path("fields." + fieldName + ".end");
+
+      if (startNode.isMissingNode() && endNode.isMissingNode()) {
+        return (index) -> index; // Simple, non-cycling sequence
+      }
+
+      if (startNode.isMissingNode() || endNode.isMissingNode()) {
+        throw new IllegalArgumentException(
+            "For a cycling sequence generator, both 'start' and 'end' must be 
specified.");
+      }
+
+      long start = startNode.asLong();
+      long end = endNode.asLong();
+
+      if (start > end) {
+        throw new IllegalArgumentException(
+            String.format(
+                "For sequence generator, 'start' (%d) cannot be greater than 
'end' (%d).",
+                start, end));
+      }
+      long cycleLength = end - start + 1;
+      return (index) -> start + (index % cycleLength);

Review Comment:
   Does this mean `sequence` only supports INT64 schema field type? If so, can 
we validate this ahead. of time?



##########
sdks/java/extensions/sql/src/test/java/org/apache/beam/sdk/extensions/sql/meta/provider/datagen/DataGeneratorTableProviderTest.java:
##########
@@ -0,0 +1,296 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datagen;
+
+import java.math.BigDecimal;
+import org.apache.beam.sdk.extensions.sql.SqlTransform;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.PCollection.IsBounded;
+import org.apache.beam.sdk.values.Row;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.ExpectedException;
+
+/** Unit tests for the {@link DataGeneratorTableProvider}. */
+public class DataGeneratorTableProviderTest {
+
+  @Rule public TestPipeline pipeline = TestPipeline.create();
+  @Rule public ExpectedException thrown = ExpectedException.none();
+
+  @Test
+  public void testBoundedGeneration() {
+    String createDdl =
+        "CREATE EXTERNAL TABLE bounded_table (\n"
+            + "  id BIGINT,\n"
+            + "  name VARCHAR\n"
+            + ") TYPE 'datagen' TBLPROPERTIES '{\n"
+            + "  \"number-of-rows\": \"100\"\n"
+            + "}'";
+
+    PCollection<Row> result =
+        pipeline.apply(
+            "testBoundedGeneration",
+            SqlTransform.query("SELECT * FROM 
bounded_table").withDdlString(createDdl));
+
+    Assert.assertEquals(IsBounded.BOUNDED, result.isBounded());
+    PAssert.that(result.apply(Count.globally())).containsInAnyOrder(100L);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  private static class ValidateFieldsFn extends DoFn<Row, Void> {
+    private final long startId;
+    private final long endId;
+    private final int nameLength;
+    private final double minScore;
+    private final double maxScore;
+
+    ValidateFieldsFn(long startId, long endId, int nameLength, double 
minScore, double maxScore) {
+      this.startId = startId;
+      this.endId = endId;
+      this.nameLength = nameLength;
+      this.minScore = minScore;
+      this.maxScore = maxScore;
+    }
+
+    @ProcessElement
+    public void processElement(@Element Row row) {
+      Long eventId = row.getInt64("event_id");
+      Assert.assertTrue("Event ID should be within range", eventId >= startId 
&& eventId <= endId);
+
+      String eventName = row.getString("event_name");
+      Assert.assertEquals("Event name should have correct length", nameLength, 
eventName.length());
+
+      Double score = row.getDouble("score");
+      Assert.assertTrue("Score should be within range", score >= minScore && 
score <= maxScore);
+    }
+  }
+
+  @Test
+  public void testFieldGenerators() {
+    String createDdl =
+        "CREATE EXTERNAL TABLE complex_table (\n"
+            + "  event_id BIGINT,\n"
+            + "  event_name VARCHAR,\n"
+            + "  score DOUBLE\n"
+            + ") TYPE 'datagen' TBLPROPERTIES '{\n"
+            + "  \"number-of-rows\": \"10\",\n"
+            + "  \"fields.event_id.kind\": \"sequence\",\n"
+            + "  \"fields.event_id.start\": \"100\",\n"
+            + "  \"fields.event_id.end\": \"109\",\n"
+            + "  \"fields.event_name.kind\": \"random\",\n"
+            + "  \"fields.event_name.length\": \"15\",\n"
+            + "  \"fields.score.kind\": \"random\",\n"
+            + "  \"fields.score.min\": \"50.0\",\n"
+            + "  \"fields.score.max\": \"100.0\"\n"
+            + "}'";
+
+    PCollection<Row> result =
+        pipeline.apply(
+            "testFieldGenerators",
+            SqlTransform.query("SELECT * FROM 
complex_table").withDdlString(createDdl));
+
+    result.apply("ValidateFields", ParDo.of(new ValidateFieldsFn(100, 109, 15, 
50.0, 100.0)));
+    pipeline.run().waitUntilFinish();
+  }
+
+  private static class ValidateNullsFn extends DoFn<Row, Void> {
+    @ProcessElement
+    public void processElement(@Element Row row) {
+      Assert.assertNull("Field should be null", 
row.getValue("nullable_field"));
+      Assert.assertNotNull("Field should not be null", 
row.getInt64("non_nullable_field"));
+    }
+  }
+
+  @Test
+  public void testNullRate() {
+    String createDdl =
+        "CREATE EXTERNAL TABLE null_rate_table (\n"
+            + "  nullable_field VARCHAR,\n"
+            + "  non_nullable_field BIGINT\n"
+            + ") TYPE 'datagen' TBLPROPERTIES '{\n"
+            + "  \"number-of-rows\": \"50\",\n"
+            + "  \"fields.nullable_field.null-rate\": \"1.0\",\n"
+            + "  \"fields.non_nullable_field.kind\": \"sequence\"\n"
+            + "}'";
+
+    PCollection<Row> result =
+        pipeline.apply(
+            "testNullRate",
+            SqlTransform.query("SELECT * FROM 
null_rate_table").withDdlString(createDdl));
+
+    result.apply("ValidateNulls", ParDo.of(new ValidateNullsFn()));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  private static class ValidateAllTypesFn extends DoFn<Row, Void> {
+    @ProcessElement
+    public void processElement(@Element Row row) {
+      Assert.assertNotNull(row.getBoolean("is_active"));
+
+      BigDecimal cost = row.getDecimal("cost");
+      Assert.assertTrue("Cost should be >= 10.50", 
cost.compareTo(BigDecimal.valueOf(10.50)) >= 0);
+      Assert.assertTrue("Cost should be <= 99.99", 
cost.compareTo(BigDecimal.valueOf(99.99)) <= 0);
+
+      Instant now = Instant.now();
+      Instant pastTimestamp = row.getDateTime("past_timestamp").toInstant();
+      Instant nowTimestamp = row.getDateTime("now_timestamp").toInstant();
+
+      Assert.assertTrue("past_timestamp should be in the past", 
pastTimestamp.isBefore(now));
+      Assert.assertTrue(
+          "now_timestamp should be very recent",
+          now.plus(Duration.millis(100)).isAfter(nowTimestamp));
+    }
+  }
+
+  @Test
+  public void testAllDataTypes() {
+    String createDdl =
+        "CREATE EXTERNAL TABLE all_types_table (\n"
+            + "  is_active BOOLEAN,\n"
+            + "  cost DECIMAL,\n"
+            + "  past_timestamp TIMESTAMP,\n"
+            + "  now_timestamp TIMESTAMP\n"
+            + ") TYPE 'datagen' TBLPROPERTIES '{\n"
+            + "  \"number-of-rows\": \"10\",\n"
+            + "  \"fields.cost.min\": \"10.50\",\n"
+            + "  \"fields.cost.max\": \"99.99\",\n"
+            + "  \"fields.past_timestamp.max-past\": \"3600000\"\n"
+            + "}'";
+
+    pipeline
+        .apply(
+            "testAllDataTypes",
+            SqlTransform.query("SELECT * FROM 
all_types_table").withDdlString(createDdl))
+        .apply("ValidateAllTypes", ParDo.of(new ValidateAllTypesFn()));
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  @Test
+  public void testMissingRequiredPropertyThrowsException() {
+    String createDdl = "CREATE EXTERNAL TABLE bad_table (id INT) TYPE 
'datagen' TBLPROPERTIES '{}'";
+
+    thrown.expect(IllegalArgumentException.class);
+    thrown.expectMessage(
+        "A 'datagen' table requires either 'rows-per-second' (for unbounded) 
or 'number-of-rows' (for bounded) in TBLPROPERTIES.");
+
+    pipeline.apply(
+        "testMissingRequiredProperty",
+        SqlTransform.query("SELECT * FROM 
bad_table").withDdlString(createDdl));
+    pipeline.run().waitUntilFinish();
+  }
+
+  /**
+   * Tests the default processing-time behavior to ensure it still works 
correctly and that
+   * event-time configuration is not required.
+   */
+  @Test
+  public void testProcessingTimeBehavior() {
+    String createDdl =
+        "CREATE EXTERNAL TABLE processing_time_table (\n"
+            + "  id BIGINT\n"
+            + ") TYPE 'datagen' TBLPROPERTIES '{\n"
+            + "  \"number-of-rows\": \"20\"\n"
+            + "}'";
+
+    PCollection<Row> result =
+        pipeline.apply(
+            "testProcessingTimeBehavior",
+            SqlTransform.query("SELECT * FROM 
processing_time_table").withDdlString(createDdl));
+
+    PAssert.that(result.apply("CountRows", 
Count.globally())).containsInAnyOrder(20L);
+
+    pipeline.run().waitUntilFinish();
+  }
+
+  private static class ValidateMultiTimestampFn extends DoFn<Row, Void> {
+    @ProcessElement
+    public void processElement(@Element Row row) {
+      Instant mainEventTime = row.getDateTime("main_event_time").toInstant();
+      Instant secondaryTime = row.getDateTime("secondary_time").toInstant();
+
+      Assert.assertTrue(
+          "Secondary timestamp should be recent",
+          Instant.now().plus(Duration.millis(200)).isAfter(secondaryTime));

Review Comment:
   I think we can also check that the element timestamp is indeed equal to 
`main_event_time`
   ```suggestion
       public void processElement(@Element Row row, @Timestamp Instant rowTs) {
         Instant mainEventTime = row.getDateTime("main_event_time").toInstant();
         Instant secondaryTime = row.getDateTime("secondary_time").toInstant();
   
         Assert.assertEquals(mainEventTime, rowTs);
   ```



##########
sdks/java/extensions/sql/src/main/java/org/apache/beam/sdk/extensions/sql/meta/provider/datagen/DataGeneratorRowFn.java:
##########
@@ -0,0 +1,189 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.extensions.sql.meta.provider.datagen;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.node.ObjectNode;
+import java.io.Serializable;
+import java.math.BigDecimal;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Random;
+import javax.annotation.Nullable;
+import org.apache.beam.sdk.extensions.sql.impl.utils.CalciteUtils;
+import org.apache.beam.sdk.schemas.Schema;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.values.Row;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.calcite.sql.type.SqlTypeName;
+import 
org.apache.beam.vendor.calcite.v1_28_0.org.apache.commons.lang.RandomStringUtils;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+
+/** A stateful DoFn that converts a sequence of Longs into structured Rows. */
+public class DataGeneratorRowFn extends DoFn<Long, Row> {
+  private final Schema schema;
+  private final ObjectNode properties;
+  private final @Nullable String primaryTimestampField;
+
+  private transient Map<String, FieldGenerator> fieldGenerators;
+  private transient Random random;
+
+  @SuppressWarnings("initialization")
+  public DataGeneratorRowFn(
+      Schema schema, ObjectNode properties, @Nullable String 
primaryTimestampField) {
+    this.schema = schema;
+    this.properties = properties;
+    this.primaryTimestampField = primaryTimestampField;
+  }
+
+  @Setup
+  public void setup() {
+    this.random = new Random();
+    this.fieldGenerators = new HashMap<>();
+
+    for (Schema.Field field : schema.getFields()) {
+      fieldGenerators.put(field.getName(), createGeneratorForField(field));
+    }
+  }
+
+  @ProcessElement
+  public void processElement(
+      @Element Long index, @Timestamp Instant timestamp, OutputReceiver<Row> 
out) {
+    Row.Builder rowBuilder = Row.withSchema(schema);
+    for (Schema.Field field : schema.getFields()) {
+      Object value;
+      if (field.getName().equals(this.primaryTimestampField)) {
+        value = timestamp.toDateTime();
+      } else {
+        FieldGenerator generator = fieldGenerators.get(field.getName());
+        if (generator == null) {
+          throw new IllegalStateException("No generator found for field: " + 
field.getName());
+        }
+        value = generator.generate(index);
+      }
+      rowBuilder.addValue(value);
+    }
+    out.output(rowBuilder.build());
+  }
+
+  @FunctionalInterface
+  private interface FieldGenerator extends Serializable {
+    @Nullable
+    Object generate(long index);
+  }
+
+  private FieldGenerator createGeneratorForField(Schema.Field field) {
+    String fieldName = field.getName();
+    FieldGenerator valueGenerator = createValueGeneratorForField(field);
+    double nullRate = properties.path("fields." + fieldName + 
".null-rate").asDouble(0.0);
+
+    if (nullRate > 0) {
+      return (index) ->
+          Objects.requireNonNull(random).nextDouble() < nullRate
+              ? null
+              : valueGenerator.generate(index);
+    }
+    return valueGenerator;
+  }
+
+  private FieldGenerator createValueGeneratorForField(Schema.Field field) {
+    String fieldName = field.getName();
+    String kind = properties.path("fields." + fieldName + 
".kind").asText("random");
+
+    if ("sequence".equalsIgnoreCase(kind)) {
+      JsonNode startNode = properties.path("fields." + fieldName + ".start");
+      JsonNode endNode = properties.path("fields." + fieldName + ".end");
+
+      if (startNode.isMissingNode() && endNode.isMissingNode()) {
+        return (index) -> index; // Simple, non-cycling sequence
+      }
+
+      if (startNode.isMissingNode() || endNode.isMissingNode()) {
+        throw new IllegalArgumentException(
+            "For a cycling sequence generator, both 'start' and 'end' must be 
specified.");
+      }
+
+      long start = startNode.asLong();
+      long end = endNode.asLong();
+
+      if (start > end) {
+        throw new IllegalArgumentException(
+            String.format(
+                "For sequence generator, 'start' (%d) cannot be greater than 
'end' (%d).",
+                start, end));
+      }
+      long cycleLength = end - start + 1;
+      return (index) -> start + (index % cycleLength);

Review Comment:
   (I think we should be able to support other types as well, but doesn't have 
to be in this PR)



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


Reply via email to