Abacn commented on code in PR #37535:
URL: https://github.com/apache/beam/pull/37535#discussion_r3052311404
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1024,25 +1062,208 @@ public String apply(BigQueryDynamicReadDescriptor
input) {
.apply("Checkpoint", Redistribute.byKey());
PCollectionTuple resultTuple =
- addJobId
- .apply("Create streams", ParDo.of(new
CreateBoundedSourceForTable()))
+ addJobId.apply(
+ "Create streams",
+ ParDo.of(new CreateBoundedSourceForTable(cleanupInfoTag))
+ .withOutputTags(streamTag, TupleTagList.of(cleanupInfoTag)));
+
+ PCollection<KV<String, BigQueryStorageStreamSource<T>>> streams =
+ resultTuple
+ .get(streamTag)
.setCoder(
- SerializableCoder.of(new
TypeDescriptor<BigQueryStorageStreamSource<T>>() {}))
- .apply("Redistribute", Redistribute.arbitrarily())
- .apply(
- "Read Streams with storage read api",
- ParDo.of(
- new TypedRead.ReadTableSource<T>(
- rowTag, getParseFn(), getBadRecordRouter()))
- .withOutputTags(rowTag,
TupleTagList.of(BAD_RECORD_TAG)));
+ KvCoder.of(
+ StringUtf8Coder.of(),
+ SerializableCoder.of(
+ new TypeDescriptor<BigQueryStorageStreamSource<T>>()
{})))
+ .apply("Redistribute", Redistribute.arbitrarily());
+
+ PCollectionTuple readResultTuple =
+ streams.apply(
+ "Read Streams with storage read api",
+ ParDo.of(
+ new ReadDynamicStreamSource<T>(
+ rowTag, getParseFn(), getBadRecordRouter(),
cleanupInfoTag))
+ .withOutputTags(rowTag,
TupleTagList.of(BAD_RECORD_TAG).and(cleanupInfoTag)));
+
+ PCollection<KV<String, CleanupOperationMessage>> cleanupMessages1 =
+ resultTuple
+ .get(cleanupInfoTag)
+ .setCoder(
+ KvCoder.of(
+ StringUtf8Coder.of(),
SerializableCoder.of(CleanupOperationMessage.class)));
+
+ PCollection<KV<String, CleanupOperationMessage>> cleanupMessages2 =
+ readResultTuple
+ .get(cleanupInfoTag)
+ .setCoder(
+ KvCoder.of(
+ StringUtf8Coder.of(),
SerializableCoder.of(CleanupOperationMessage.class)));
+
+ PCollectionList.of(cleanupMessages1)
+ .and(cleanupMessages2)
+ .apply(Flatten.pCollections())
+ .apply("CleanupTempTables", ParDo.of(new
CleanupTempTableDoFn(getBigQueryServices())));
+
getBadRecordErrorHandler()
.addErrorCollection(
-
resultTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())));
- return resultTuple.get(rowTag).setCoder(getOutputCoder());
+ readResultTuple
+ .get(BAD_RECORD_TAG)
+ .setCoder(BadRecord.getCoder(input.getPipeline())));
+ return readResultTuple.get(rowTag).setCoder(getOutputCoder());
}
}
/** Implementation of {@link BigQueryIO#read()}. */
+ static class CleanupInfo implements Serializable {
+ private final String projectId;
+ private final String datasetId;
+ private final String tableId;
+ private final boolean datasetCreatedByBeam;
+ private final int totalStreams;
+
+ public CleanupInfo(TableReference tableRef, boolean datasetCreatedByBeam,
int totalStreams) {
+ if (tableRef != null) {
+ this.projectId = tableRef.getProjectId();
+ this.datasetId = tableRef.getDatasetId();
+ this.tableId = tableRef.getTableId();
+ } else {
+ this.projectId = null;
+ this.datasetId = null;
+ this.tableId = null;
+ }
+ this.datasetCreatedByBeam = datasetCreatedByBeam;
+ this.totalStreams = totalStreams;
+ }
+
+ public TableReference getTableReference() {
+ if (projectId == null || datasetId == null || tableId == null) {
+ return null;
+ }
+ return new TableReference()
+ .setProjectId(projectId)
+ .setDatasetId(datasetId)
+ .setTableId(tableId);
+ }
+
+ public boolean isDatasetCreatedByBeam() {
+ return datasetCreatedByBeam;
+ }
+
+ public int getTotalStreams() {
+ return totalStreams;
+ }
+ }
+
+ static class CleanupOperationMessage implements Serializable {
+ private final @Nullable CleanupInfo cleanupInfo;
+ private final boolean isStreamCompletion;
+
+ private CleanupOperationMessage(@Nullable CleanupInfo cleanupInfo, boolean
isStreamCompletion) {
+ this.cleanupInfo = cleanupInfo;
+ this.isStreamCompletion = isStreamCompletion;
+ }
+
+ public static CleanupOperationMessage streamComplete() {
+ return new CleanupOperationMessage(null, true);
+ }
+
+ public static CleanupOperationMessage initialize(CleanupInfo cleanupInfo) {
+ return new CleanupOperationMessage(cleanupInfo, false);
+ }
+
+ public @Nullable CleanupInfo getCleanupInfo() {
+ return cleanupInfo;
+ }
+
+ public boolean isStreamCompletion() {
+ return isStreamCompletion;
+ }
+ }
+
+ static class CleanupTempTableDoFn extends DoFn<KV<String,
CleanupOperationMessage>, Void> {
+ private final BigQueryServices bqServices;
+ private static final Logger LOG =
LoggerFactory.getLogger(CleanupTempTableDoFn.class);
+
+ @StateId("cleanupInfo")
+ private final StateSpec<ValueState<CleanupInfo>> cleanupInfoSpec =
StateSpecs.value();
+
+ @StateId("completedStreams")
+ private final StateSpec<ValueState<Integer>> completedStreamsSpec =
StateSpecs.value();
+
+ CleanupTempTableDoFn(BigQueryServices bqServices) {
+ this.bqServices = bqServices;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element KV<String, CleanupOperationMessage> element,
+ @StateId("cleanupInfo") ValueState<CleanupInfo> cleanupInfoState,
+ @StateId("completedStreams") ValueState<Integer> completedStreamsState,
+ PipelineOptions options)
+ throws Exception {
+
+ CleanupOperationMessage msg = element.getValue();
+ CleanupInfo cleanupInfo = cleanupInfoState.read();
+ int completed = firstNonNull(completedStreamsState.read(), 0);
+
+ if (msg.isStreamCompletion()) {
+ completed += 1;
+ completedStreamsState.write(completed);
+ } else {
+ cleanupInfoState.write(msg.getCleanupInfo());
+ cleanupInfo = msg.getCleanupInfo();
+ }
+
+ if (cleanupInfo != null
+ && cleanupInfo.getTotalStreams() > 0
Review Comment:
There was an early fix #38009 for the edge case of empty table. Would it
also be a problem here?
In that case the number of stream is 0, and `cleanupInfo.getTotalStreams() >
0` will be false here, and empty table won't be cleaned up. Or, is it the case
that temp table won't be created at all, and then we're fine here.
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1024,25 +1062,208 @@ public String apply(BigQueryDynamicReadDescriptor
input) {
.apply("Checkpoint", Redistribute.byKey());
PCollectionTuple resultTuple =
- addJobId
- .apply("Create streams", ParDo.of(new
CreateBoundedSourceForTable()))
+ addJobId.apply(
+ "Create streams",
+ ParDo.of(new CreateBoundedSourceForTable(cleanupInfoTag))
+ .withOutputTags(streamTag, TupleTagList.of(cleanupInfoTag)));
+
+ PCollection<KV<String, BigQueryStorageStreamSource<T>>> streams =
+ resultTuple
+ .get(streamTag)
.setCoder(
- SerializableCoder.of(new
TypeDescriptor<BigQueryStorageStreamSource<T>>() {}))
- .apply("Redistribute", Redistribute.arbitrarily())
- .apply(
- "Read Streams with storage read api",
- ParDo.of(
- new TypedRead.ReadTableSource<T>(
- rowTag, getParseFn(), getBadRecordRouter()))
- .withOutputTags(rowTag,
TupleTagList.of(BAD_RECORD_TAG)));
+ KvCoder.of(
+ StringUtf8Coder.of(),
+ SerializableCoder.of(
+ new TypeDescriptor<BigQueryStorageStreamSource<T>>()
{})))
+ .apply("Redistribute", Redistribute.arbitrarily());
+
+ PCollectionTuple readResultTuple =
+ streams.apply(
+ "Read Streams with storage read api",
+ ParDo.of(
+ new ReadDynamicStreamSource<T>(
+ rowTag, getParseFn(), getBadRecordRouter(),
cleanupInfoTag))
+ .withOutputTags(rowTag,
TupleTagList.of(BAD_RECORD_TAG).and(cleanupInfoTag)));
+
+ PCollection<KV<String, CleanupOperationMessage>> cleanupMessages1 =
+ resultTuple
+ .get(cleanupInfoTag)
+ .setCoder(
+ KvCoder.of(
+ StringUtf8Coder.of(),
SerializableCoder.of(CleanupOperationMessage.class)));
+
+ PCollection<KV<String, CleanupOperationMessage>> cleanupMessages2 =
+ readResultTuple
+ .get(cleanupInfoTag)
+ .setCoder(
+ KvCoder.of(
+ StringUtf8Coder.of(),
SerializableCoder.of(CleanupOperationMessage.class)));
+
+ PCollectionList.of(cleanupMessages1)
+ .and(cleanupMessages2)
+ .apply(Flatten.pCollections())
+ .apply("CleanupTempTables", ParDo.of(new
CleanupTempTableDoFn(getBigQueryServices())));
+
getBadRecordErrorHandler()
.addErrorCollection(
-
resultTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())));
- return resultTuple.get(rowTag).setCoder(getOutputCoder());
+ readResultTuple
+ .get(BAD_RECORD_TAG)
+ .setCoder(BadRecord.getCoder(input.getPipeline())));
+ return readResultTuple.get(rowTag).setCoder(getOutputCoder());
}
}
/** Implementation of {@link BigQueryIO#read()}. */
+ static class CleanupInfo implements Serializable {
+ private final String projectId;
+ private final String datasetId;
+ private final String tableId;
+ private final boolean datasetCreatedByBeam;
+ private final int totalStreams;
+
+ public CleanupInfo(TableReference tableRef, boolean datasetCreatedByBeam,
int totalStreams) {
+ if (tableRef != null) {
+ this.projectId = tableRef.getProjectId();
+ this.datasetId = tableRef.getDatasetId();
+ this.tableId = tableRef.getTableId();
+ } else {
+ this.projectId = null;
+ this.datasetId = null;
+ this.tableId = null;
+ }
+ this.datasetCreatedByBeam = datasetCreatedByBeam;
+ this.totalStreams = totalStreams;
+ }
+
+ public TableReference getTableReference() {
+ if (projectId == null || datasetId == null || tableId == null) {
+ return null;
+ }
+ return new TableReference()
+ .setProjectId(projectId)
+ .setDatasetId(datasetId)
+ .setTableId(tableId);
+ }
+
+ public boolean isDatasetCreatedByBeam() {
+ return datasetCreatedByBeam;
+ }
+
+ public int getTotalStreams() {
+ return totalStreams;
+ }
+ }
+
+ static class CleanupOperationMessage implements Serializable {
+ private final @Nullable CleanupInfo cleanupInfo;
+ private final boolean isStreamCompletion;
+
+ private CleanupOperationMessage(@Nullable CleanupInfo cleanupInfo, boolean
isStreamCompletion) {
+ this.cleanupInfo = cleanupInfo;
+ this.isStreamCompletion = isStreamCompletion;
+ }
+
+ public static CleanupOperationMessage streamComplete() {
+ return new CleanupOperationMessage(null, true);
+ }
+
+ public static CleanupOperationMessage initialize(CleanupInfo cleanupInfo) {
+ return new CleanupOperationMessage(cleanupInfo, false);
+ }
+
+ public @Nullable CleanupInfo getCleanupInfo() {
+ return cleanupInfo;
+ }
+
+ public boolean isStreamCompletion() {
+ return isStreamCompletion;
+ }
+ }
+
+ static class CleanupTempTableDoFn extends DoFn<KV<String,
CleanupOperationMessage>, Void> {
+ private final BigQueryServices bqServices;
+ private static final Logger LOG =
LoggerFactory.getLogger(CleanupTempTableDoFn.class);
+
+ @StateId("cleanupInfo")
+ private final StateSpec<ValueState<CleanupInfo>> cleanupInfoSpec =
StateSpecs.value();
+
+ @StateId("completedStreams")
+ private final StateSpec<ValueState<Integer>> completedStreamsSpec =
StateSpecs.value();
+
+ CleanupTempTableDoFn(BigQueryServices bqServices) {
+ this.bqServices = bqServices;
+ }
+
+ @ProcessElement
+ public void processElement(
+ @Element KV<String, CleanupOperationMessage> element,
+ @StateId("cleanupInfo") ValueState<CleanupInfo> cleanupInfoState,
Review Comment:
Although the amount of CleanupOperationMessage should be moderate, the
existence of stateful ParDo in a job graph may add much overhead. , If we can
load-testing it (use single worker with backlog and see if there are
throughtput difference could suffice)
##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQueryIO.java:
##########
@@ -1024,25 +1062,208 @@ public String apply(BigQueryDynamicReadDescriptor
input) {
.apply("Checkpoint", Redistribute.byKey());
PCollectionTuple resultTuple =
- addJobId
- .apply("Create streams", ParDo.of(new
CreateBoundedSourceForTable()))
+ addJobId.apply(
+ "Create streams",
+ ParDo.of(new CreateBoundedSourceForTable(cleanupInfoTag))
+ .withOutputTags(streamTag, TupleTagList.of(cleanupInfoTag)));
+
+ PCollection<KV<String, BigQueryStorageStreamSource<T>>> streams =
+ resultTuple
+ .get(streamTag)
.setCoder(
- SerializableCoder.of(new
TypeDescriptor<BigQueryStorageStreamSource<T>>() {}))
- .apply("Redistribute", Redistribute.arbitrarily())
- .apply(
- "Read Streams with storage read api",
- ParDo.of(
- new TypedRead.ReadTableSource<T>(
- rowTag, getParseFn(), getBadRecordRouter()))
- .withOutputTags(rowTag,
TupleTagList.of(BAD_RECORD_TAG)));
+ KvCoder.of(
+ StringUtf8Coder.of(),
+ SerializableCoder.of(
+ new TypeDescriptor<BigQueryStorageStreamSource<T>>()
{})))
+ .apply("Redistribute", Redistribute.arbitrarily());
+
+ PCollectionTuple readResultTuple =
+ streams.apply(
+ "Read Streams with storage read api",
+ ParDo.of(
+ new ReadDynamicStreamSource<T>(
+ rowTag, getParseFn(), getBadRecordRouter(),
cleanupInfoTag))
+ .withOutputTags(rowTag,
TupleTagList.of(BAD_RECORD_TAG).and(cleanupInfoTag)));
+
+ PCollection<KV<String, CleanupOperationMessage>> cleanupMessages1 =
+ resultTuple
+ .get(cleanupInfoTag)
+ .setCoder(
+ KvCoder.of(
+ StringUtf8Coder.of(),
SerializableCoder.of(CleanupOperationMessage.class)));
+
+ PCollection<KV<String, CleanupOperationMessage>> cleanupMessages2 =
+ readResultTuple
+ .get(cleanupInfoTag)
+ .setCoder(
+ KvCoder.of(
+ StringUtf8Coder.of(),
SerializableCoder.of(CleanupOperationMessage.class)));
+
+ PCollectionList.of(cleanupMessages1)
+ .and(cleanupMessages2)
+ .apply(Flatten.pCollections())
+ .apply("CleanupTempTables", ParDo.of(new
CleanupTempTableDoFn(getBigQueryServices())));
+
getBadRecordErrorHandler()
.addErrorCollection(
-
resultTuple.get(BAD_RECORD_TAG).setCoder(BadRecord.getCoder(input.getPipeline())));
- return resultTuple.get(rowTag).setCoder(getOutputCoder());
+ readResultTuple
+ .get(BAD_RECORD_TAG)
+ .setCoder(BadRecord.getCoder(input.getPipeline())));
+ return readResultTuple.get(rowTag).setCoder(getOutputCoder());
}
}
/** Implementation of {@link BigQueryIO#read()}. */
+ static class CleanupInfo implements Serializable {
Review Comment:
Move the original javadoc beneath the new codes.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]