gemini-code-assist[bot] commented on code in PR #36668:
URL: https://github.com/apache/beam/pull/36668#discussion_r2490533252


##########
sdks/java/io/google-cloud-platform/src/test/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIODynamicQueryIT.java:
##########
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.sdk.io.gcp.bigquery;
+
+import static 
org.apache.beam.sdk.io.gcp.bigquery.TestBigQueryOptions.BIGQUERY_EARLY_ROLLOUT_REGION;
+
+import com.google.api.services.bigquery.model.TableRow;
+import java.util.Map;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.extensions.gcp.options.GcpOptions;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TableRowParser;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO.TypedRead.Method;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.ExperimentalOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.testing.PAssert;
+import org.apache.beam.sdk.testing.TestPipeline;
+import org.apache.beam.sdk.testing.TestPipelineOptions;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.Create;
+import org.apache.beam.sdk.transforms.SerializableFunction;
+import org.apache.beam.sdk.transforms.errorhandling.BadRecord;
+import org.apache.beam.sdk.transforms.errorhandling.ErrorHandler;
+import 
org.apache.beam.sdk.transforms.errorhandling.ErrorHandlingTestUtils.ErrorSinkTransform;
+import org.apache.beam.sdk.values.PCollection;
+import 
org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.collect.ImmutableMap;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+/**
+ * Integration tests for {@link BigQueryIO#read(SerializableFunction)} using 
{@link
+ * Method#DIRECT_READ} to read query results. This test runs a simple "SELECT 
*" query over a
+ * pre-defined table and asserts that the number of records read is equal to 
the expected count.
+ */
+@RunWith(JUnit4.class)
+public class BigQueryIODynamicQueryIT {
+
+  private static final Map<String, Long> EXPECTED_NUM_RECORDS =
+      ImmutableMap.of(
+          "empty", 0L,
+          "1M", 10592L,
+          "1G", 11110839L,
+          "1T", 11110839000L);
+
+  private static final String DATASET_ID =
+      TestPipeline.testingPipelineOptions()
+              .as(TestBigQueryOptions.class)
+              .getBigQueryLocation()
+              .equals(BIGQUERY_EARLY_ROLLOUT_REGION)
+          ? "big_query_storage_day0"
+          : "big_query_storage";
+  private static final String TABLE_PREFIX = "storage_read_";
+
+  private BigQueryIOQueryOptions options;
+
+  /** Customized {@link TestPipelineOptions} for BigQueryIOStorageQuery 
pipelines. */
+  public interface BigQueryIOQueryOptions extends TestPipelineOptions, 
ExperimentalOptions {
+    @Description("The table to be queried")
+    @Validation.Required
+    String getInputTable();
+
+    void setInputTable(String table);
+
+    @Description("The expected number of records")
+    @Validation.Required
+    long getNumRecords();
+
+    void setNumRecords(long numRecords);
+  }
+
+  private void setUpTestEnvironment(String tableSize) {
+    PipelineOptionsFactory.register(BigQueryIOQueryOptions.class);
+    options = 
TestPipeline.testingPipelineOptions().as(BigQueryIOQueryOptions.class);
+    options.setNumRecords(EXPECTED_NUM_RECORDS.get(tableSize));
+    String project = 
TestPipeline.testingPipelineOptions().as(GcpOptions.class).getProject();
+    options.setInputTable(project + '.' + DATASET_ID + '.' + TABLE_PREFIX + 
tableSize);
+  }
+
+  private void runBigQueryIODynamicQueryPipeline() {
+    Pipeline p = Pipeline.create(options);
+    PCollection<Long> count =
+        p.apply(
+                Create.of(
+                    BigQueryDynamicReadDescriptor.create(
+                        "SELECT * FROM `" + options.getInputTable() + "`",
+                        null,
+                        false,
+                        false,
+                        null,
+                        null)))
+            .apply(
+                "DynamicRead",
+                BigQueryIO.readDynamically(TableRowParser.INSTANCE, 
TableRowJsonCoder.of())
+                    .withQueryTempProject("radoslaws-playground-pso"))

Review Comment:
   ![critical](https://www.gstatic.com/codereviewagent/critical.svg)
   
   The test uses a hardcoded project ID (`radoslaws-playground-pso`), which 
makes it non-portable and dependent on a personal project. Please use the 
project ID from the pipeline options, which is standard practice for 
integration tests in this repository. You can retrieve it using 
`options.as(GcpOptions.class).getProject()`.
   
   ```suggestion
                       
.withQueryTempProject(options.as(GcpOptions.class).getProject()))
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryStorageStreamSource.java:
##########
@@ -79,6 +80,26 @@ public static <T> BigQueryStorageStreamSource<T> create(
         bqServices);
   }
 
+  @Override
+  public boolean equals(@Nullable Object obj) {
+    if (this == obj) {
+      return true;
+    }
+    if (obj == null || getClass() != obj.getClass()) {
+      return false;
+    }
+    BigQueryStorageStreamSource<?> other = (BigQueryStorageStreamSource<?>) 
obj;
+    return readSession.equals(other.readSession)
+        && readStream.equals(other.readStream)
+        && jsonTableSchema.equals(other.jsonTableSchema)
+        && outputCoder.equals(other.outputCoder);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hashCode(readSession, readStream, jsonTableSchema, parseFn, 
outputCoder);

Review Comment:
   ![high](https://www.gstatic.com/codereviewagent/high-priority.svg)
   
   The `hashCode()` method includes `parseFn`, but the `equals()` method does 
not. This violates the `Object.hashCode()` contract, which requires that equal 
objects have equal hash codes. Since `SerializableFunction`'s `equals` method 
relies on reference equality, it's best to exclude `parseFn` from both methods 
for consistent behavior. Please remove `parseFn` from the `hashCode` 
calculation.
   
   ```java
       return Objects.hashCode(readSession, readStream, jsonTableSchema, 
outputCoder);
   ```



##########
examples/java/src/main/java/org/apache/beam/examples/cookbook/BigQueryStreamingTornadoes.java:
##########
@@ -0,0 +1,209 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.beam.examples.cookbook;
+
+import com.google.api.services.bigquery.model.TableFieldSchema;
+import com.google.api.services.bigquery.model.TableRow;
+import com.google.api.services.bigquery.model.TableSchema;
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.beam.sdk.Pipeline;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryDynamicReadDescriptor;
+import org.apache.beam.sdk.io.gcp.bigquery.BigQueryIO;
+import org.apache.beam.sdk.options.Default;
+import org.apache.beam.sdk.options.Description;
+import org.apache.beam.sdk.options.PipelineOptions;
+import org.apache.beam.sdk.options.PipelineOptionsFactory;
+import org.apache.beam.sdk.options.Validation;
+import org.apache.beam.sdk.transforms.Count;
+import org.apache.beam.sdk.transforms.DoFn;
+import org.apache.beam.sdk.transforms.MapElements;
+import org.apache.beam.sdk.transforms.PTransform;
+import org.apache.beam.sdk.transforms.ParDo;
+import org.apache.beam.sdk.transforms.PeriodicImpulse;
+import org.apache.beam.sdk.transforms.windowing.FixedWindows;
+import org.apache.beam.sdk.transforms.windowing.Window;
+import org.apache.beam.sdk.values.KV;
+import org.apache.beam.sdk.values.PCollection;
+import org.apache.beam.sdk.values.TypeDescriptor;
+import org.joda.time.Duration;
+import org.joda.time.Instant;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * An example that reads periodically the public samples of weather data from 
BigQuery, counts the
+ * number of tornadoes that occur in each month, and writes the results to 
BigQuery.
+ *
+ * <p>Concepts: Reading/writing BigQuery; counting a PCollection; user-defined 
PTransforms
+ *
+ * <p>Note: Before running this example, you must create a BigQuery dataset to 
contain your output
+ * table.
+ *
+ * <p>To execute this pipeline locally, specify the BigQuery table for the 
output with the form:
+ *
+ * <pre>{@code
+ * --output=YOUR_PROJECT_ID:DATASET_ID.TABLE_ID
+ * }</pre>
+ *
+ * <p>To change the runner, specify:
+ *
+ * <pre>{@code
+ * --runner=YOUR_SELECTED_RUNNER
+ * }</pre>
+ *
+ * See examples/java/README.md for instructions about how to configure 
different runners.
+ *
+ * <p>The BigQuery input table defaults to {@code 
apache-beam-testing.samples.weather_stations} and
+ * can be overridden with {@code --input}.
+ */
+public class BigQueryStreamingTornadoes {
+  private static final Logger LOG = 
LoggerFactory.getLogger(BigQueryStreamingTornadoes.class);
+
+  // Default to using a 1000 row subset of the public weather station table 
publicdata:samples.gsod.
+  private static final String WEATHER_SAMPLES_TABLE =
+      "apache-beam-testing.samples.weather_stations";
+
+  /**
+   * Examines each row in the input table. If a tornado was recorded in that 
sample, the month in
+   * which it occurred is output.
+   */
+  static class ExtractTornadoesFn extends DoFn<TableRow, Integer> {
+    @ProcessElement
+    public void processElement(ProcessContext c) {
+      TableRow row = c.element();
+      if ((Boolean) row.get("tornado")) {

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The direct cast `(Boolean) row.get("tornado")` is unsafe and will cause a 
`NullPointerException` if the 'tornado' field is `null`. It's safer to use 
`Boolean.TRUE.equals(row.get("tornado"))` to correctly handle `null` and 
`false` values without risking an exception.
   
   ```suggestion
         if (Boolean.TRUE.equals(row.get("tornado"))) {
   ```



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -804,6 +834,253 @@ public TableRow apply(SchemaAndRecord schemaAndRecord) {
       return 
BigQueryAvroUtils.convertGenericRecordToTableRow(schemaAndRecord.getRecord());
     }
   }
+  /** @deprecated this class may have breaking changes introduced, use with 
caution */
+  @Deprecated
+  @AutoValue
+  public abstract static class DynamicRead<T>
+      extends PTransform<PCollection<BigQueryDynamicReadDescriptor>, 
PCollection<T>> {
+
+    abstract BigQueryServices getBigQueryServices();
+
+    abstract DataFormat getFormat();
+
+    abstract @Nullable SerializableFunction<SchemaAndRecord, T> getParseFn();
+
+    abstract @Nullable Coder<T> getOutputCoder();
+
+    abstract boolean getProjectionPushdownApplied();
+
+    abstract BadRecordRouter getBadRecordRouter();
+
+    abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();
+
+    abstract @Nullable String getQueryLocation();
+
+    abstract @Nullable String getQueryTempDataset();
+
+    abstract @Nullable String getQueryTempProject();
+
+    abstract @Nullable String getKmsKey();
+
+    abstract DynamicRead.Builder<T> toBuilder();
+
+    public DynamicRead<T> withQueryLocation(String location) {
+      return toBuilder().setQueryLocation(location).build();
+    }
+
+    public DynamicRead<T> withQueryTempProject(String tempProject) {
+      return toBuilder().setQueryTempProject(tempProject).build();
+    }
+
+    public DynamicRead<T> withQueryTempDataset(String tempDataset) {
+      return toBuilder().setQueryTempDataset(tempDataset).build();
+    }
+
+    public DynamicRead<T> withKmsKey(String kmsKey) {
+      return toBuilder().setKmsKey(kmsKey).build();
+    }
+
+    public DynamicRead<T> withFormat(DataFormat format) {
+      return toBuilder().setFormat(format).build();
+    }
+
+    public DynamicRead<T> withBadRecordErrorHandler(
+        ErrorHandler<BadRecord, ?> badRecordErrorHandler) {
+      return toBuilder()
+          .setBadRecordRouter(RECORDING_ROUTER)
+          .setBadRecordErrorHandler(badRecordErrorHandler)
+          .build();
+    }
+
+    @VisibleForTesting
+    public DynamicRead<T> withTestServices(BigQueryServices testServices) {
+      return toBuilder().setBigQueryServices(testServices).build();
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      abstract Builder<T> setFormat(DataFormat format);
+
+      abstract Builder<T> setBigQueryServices(BigQueryServices 
bigQueryServices);
+
+      abstract Builder<T> setParseFn(SerializableFunction<SchemaAndRecord, T> 
parseFn);
+
+      abstract Builder<T> setOutputCoder(Coder<T> coder);
+
+      abstract Builder<T> setProjectionPushdownApplied(boolean 
projectionPushdownApplied);
+
+      abstract Builder<T> setBadRecordErrorHandler(
+          ErrorHandler<BadRecord, ?> badRecordErrorHandler);
+
+      abstract Builder<T> setBadRecordRouter(BadRecordRouter badRecordRouter);
+
+      abstract DynamicRead<T> build();
+
+      abstract Builder<T> setKmsKey(String kmsKey);
+
+      abstract Builder<T> setQueryLocation(String queryLocation);
+
+      abstract Builder<T> setQueryTempDataset(String queryTempDataset);
+
+      abstract Builder<T> setQueryTempProject(String queryTempProject);
+    }
+
+    DynamicRead() {}
+
+    public static <T> void readSource(
+        PipelineOptions options,
+        TupleTag<T> rowTag,
+        MultiOutputReceiver outputReceiver,
+        BoundedSource<T> streamSource,
+        TypedRead.ErrorHandlingParseFn<T> errorHandlingParseFn,
+        BadRecordRouter badRecordRouter)
+        throws Exception {
+      // Read all the data from the stream. In the event that this work
+      // item fails and is rescheduled, the same rows will be returned in
+      // the same order.
+      BoundedSource.BoundedReader<T> reader = 
streamSource.createReader(options);
+
+      try {
+        if (reader.start()) {
+          outputReceiver.get(rowTag).output(reader.getCurrent());
+        } else {
+          return;
+        }
+      } catch (TypedRead.ParseException e) {
+        GenericRecord record = 
errorHandlingParseFn.getSchemaAndRecord().getRecord();
+        badRecordRouter.route(
+            outputReceiver,
+            record,
+            AvroCoder.of(record.getSchema()),
+            (Exception) e.getCause(),
+            "Unable to parse record reading from BigQuery");
+      }
+
+      while (true) {
+        try {
+          if (reader.advance()) {
+            outputReceiver.get(rowTag).output(reader.getCurrent());
+          } else {
+            return;
+          }
+        } catch (TypedRead.ParseException e) {
+          GenericRecord record = 
errorHandlingParseFn.getSchemaAndRecord().getRecord();
+          badRecordRouter.route(
+              outputReceiver,
+              record,
+              AvroCoder.of(record.getSchema()),
+              (Exception) e.getCause(),
+              "Unable to parse record reading from BigQuery");
+        }
+      }
+    }
+
+    class CreateBoundedSourceForTable
+        extends DoFn<KV<String, BigQueryDynamicReadDescriptor>, 
BigQueryStorageStreamSource<T>> {
+
+      @ProcessElement
+      public void processElement(
+          OutputReceiver<BigQueryStorageStreamSource<T>> receiver,
+          @Element KV<String, BigQueryDynamicReadDescriptor> kv,
+          PipelineOptions options)
+          throws Exception {
+
+        BigQueryDynamicReadDescriptor descriptor = kv.getValue();
+        if (descriptor.getTable() != null) {
+          BigQueryStorageTableSource<T> output =
+              BigQueryStorageTableSource.create(
+                  
StaticValueProvider.of(BigQueryHelpers.parseTableSpec(descriptor.getTable())),
+                  getFormat(),
+                  descriptor.getSelectedFields() != null
+                      ? StaticValueProvider.of(descriptor.getSelectedFields())
+                      : null,
+                  descriptor.getRowRestriction() != null
+                      ? StaticValueProvider.of(descriptor.getRowRestriction())
+                      : null,
+                  getParseFn(),
+                  getOutputCoder(),
+                  getBigQueryServices(),
+                  getProjectionPushdownApplied());
+          // 1mb --> 1 shard; 1gb --> 32 shards; 1tb --> 1000 shards, 1pb --> 
32k
+          // shards
+          long desiredChunkSize =
+              Math.max(1 << 20, (long) (1000 * 
Math.sqrt(output.getEstimatedSizeBytes(options))));

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   The logic for calculating `desiredChunkSize` is duplicated for query-based 
reads on lines 1042-1043. To improve maintainability and reduce code 
duplication, consider extracting this logic into a private helper method within 
the `CreateBoundedSourceForTable` class.



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -804,6 +834,253 @@ public TableRow apply(SchemaAndRecord schemaAndRecord) {
       return 
BigQueryAvroUtils.convertGenericRecordToTableRow(schemaAndRecord.getRecord());
     }
   }
+  /** @deprecated this class may have breaking changes introduced, use with 
caution */
+  @Deprecated
+  @AutoValue
+  public abstract static class DynamicRead<T>
+      extends PTransform<PCollection<BigQueryDynamicReadDescriptor>, 
PCollection<T>> {
+
+    abstract BigQueryServices getBigQueryServices();
+
+    abstract DataFormat getFormat();
+
+    abstract @Nullable SerializableFunction<SchemaAndRecord, T> getParseFn();
+
+    abstract @Nullable Coder<T> getOutputCoder();
+
+    abstract boolean getProjectionPushdownApplied();
+
+    abstract BadRecordRouter getBadRecordRouter();
+
+    abstract ErrorHandler<BadRecord, ?> getBadRecordErrorHandler();
+
+    abstract @Nullable String getQueryLocation();
+
+    abstract @Nullable String getQueryTempDataset();
+
+    abstract @Nullable String getQueryTempProject();
+
+    abstract @Nullable String getKmsKey();
+
+    abstract DynamicRead.Builder<T> toBuilder();
+
+    public DynamicRead<T> withQueryLocation(String location) {
+      return toBuilder().setQueryLocation(location).build();
+    }
+
+    public DynamicRead<T> withQueryTempProject(String tempProject) {
+      return toBuilder().setQueryTempProject(tempProject).build();
+    }
+
+    public DynamicRead<T> withQueryTempDataset(String tempDataset) {
+      return toBuilder().setQueryTempDataset(tempDataset).build();
+    }
+
+    public DynamicRead<T> withKmsKey(String kmsKey) {
+      return toBuilder().setKmsKey(kmsKey).build();
+    }
+
+    public DynamicRead<T> withFormat(DataFormat format) {
+      return toBuilder().setFormat(format).build();
+    }
+
+    public DynamicRead<T> withBadRecordErrorHandler(
+        ErrorHandler<BadRecord, ?> badRecordErrorHandler) {
+      return toBuilder()
+          .setBadRecordRouter(RECORDING_ROUTER)
+          .setBadRecordErrorHandler(badRecordErrorHandler)
+          .build();
+    }
+
+    @VisibleForTesting
+    public DynamicRead<T> withTestServices(BigQueryServices testServices) {
+      return toBuilder().setBigQueryServices(testServices).build();
+    }
+
+    @AutoValue.Builder
+    abstract static class Builder<T> {
+
+      abstract Builder<T> setFormat(DataFormat format);
+
+      abstract Builder<T> setBigQueryServices(BigQueryServices 
bigQueryServices);
+
+      abstract Builder<T> setParseFn(SerializableFunction<SchemaAndRecord, T> 
parseFn);
+
+      abstract Builder<T> setOutputCoder(Coder<T> coder);
+
+      abstract Builder<T> setProjectionPushdownApplied(boolean 
projectionPushdownApplied);
+
+      abstract Builder<T> setBadRecordErrorHandler(
+          ErrorHandler<BadRecord, ?> badRecordErrorHandler);
+
+      abstract Builder<T> setBadRecordRouter(BadRecordRouter badRecordRouter);
+
+      abstract DynamicRead<T> build();
+
+      abstract Builder<T> setKmsKey(String kmsKey);
+
+      abstract Builder<T> setQueryLocation(String queryLocation);
+
+      abstract Builder<T> setQueryTempDataset(String queryTempDataset);
+
+      abstract Builder<T> setQueryTempProject(String queryTempProject);
+    }
+
+    DynamicRead() {}
+
+    public static <T> void readSource(
+        PipelineOptions options,
+        TupleTag<T> rowTag,
+        MultiOutputReceiver outputReceiver,
+        BoundedSource<T> streamSource,
+        TypedRead.ErrorHandlingParseFn<T> errorHandlingParseFn,
+        BadRecordRouter badRecordRouter)
+        throws Exception {
+      // Read all the data from the stream. In the event that this work
+      // item fails and is rescheduled, the same rows will be returned in
+      // the same order.
+      BoundedSource.BoundedReader<T> reader = 
streamSource.createReader(options);
+
+      try {
+        if (reader.start()) {
+          outputReceiver.get(rowTag).output(reader.getCurrent());
+        } else {
+          return;
+        }
+      } catch (TypedRead.ParseException e) {
+        GenericRecord record = 
errorHandlingParseFn.getSchemaAndRecord().getRecord();
+        badRecordRouter.route(
+            outputReceiver,
+            record,
+            AvroCoder.of(record.getSchema()),
+            (Exception) e.getCause(),
+            "Unable to parse record reading from BigQuery");
+      }
+
+      while (true) {
+        try {
+          if (reader.advance()) {
+            outputReceiver.get(rowTag).output(reader.getCurrent());
+          } else {
+            return;
+          }
+        } catch (TypedRead.ParseException e) {
+          GenericRecord record = 
errorHandlingParseFn.getSchemaAndRecord().getRecord();
+          badRecordRouter.route(
+              outputReceiver,
+              record,
+              AvroCoder.of(record.getSchema()),
+              (Exception) e.getCause(),
+              "Unable to parse record reading from BigQuery");
+        }
+      }
+    }

Review Comment:
   ![medium](https://www.gstatic.com/codereviewagent/medium-priority.svg)
   
   This `readSource` method within the `DynamicRead` class is a duplicate of 
`TypedRead.readSource` and appears to be unused. The `expand` method in 
`DynamicRead` already utilizes `TypedRead.ReadTableSource`, which in turn calls 
`TypedRead.readSource`. To avoid code duplication and dead code, this method 
should be removed.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to