This is an automated email from the ASF dual-hosted git repository.
shunping pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/beam.git
The following commit(s) were added to refs/heads/master by this push:
new d935ef83fa3 Iobase streaming others (#35418)
d935ef83fa3 is described below
commit d935ef83fa386a047b1b9fb47722cdd50295a668
Author: Razvan Culea <[email protected]>
AuthorDate: Thu Aug 7 02:59:02 2025 +0200
Iobase streaming others (#35418)
* Add support for streaming writes in IOBase (Python)
* add triggering_frequency in iobase.Sink
* fix whitespaces/newlines
* fixes per
https://github.com/apache/beam/pull/35137#pullrequestreview-2906007316
* implement pre_finalize_windowed and finalize_windowed_write in
FileBasedSink
handle file naming based on windowing for streaming in internal methods
TextIO - expose the new filebasedsink capabilities to write in streaming
* update changes.md
* fix formatting
* space formatting
* space nagging
* keep in sync with refactor in https://github.com/apache/beam/pull/35137
* keep in sync with iobase changes in
https://github.com/apache/beam/pull/35137
* [Website] add akvelon case study (#34943)
* feat: add akvelon logo
* feat: add akvelon case study
* fix: remove white space
* feat: add akvelon to main page
* feat: use new images
* fix: typos
* fix: change order of akvelon case-study
* fix: update text
* fix: update mainPage text
* fix: update images
* fix: about akvelon section update
* fix: update akvelon card
* fix: update akvelon header
* fix: update code tag
* fix: update about akvelon
* fix: update date and order
* fix: add link and change img
* fix: change CDAP text
* fix: add bold weight
* fix: solve conflicts
* fix: remove unused code
* fix: delete whitespace
* fix: indents format
* fix: add bold text
---------
Co-authored-by: Bulat Safiullin <[email protected]>
* added the rate test for GenerateSequence (#35108)
* added the rate test for GenerateSequence
* keep the master yaml
* Re-enable logging after importing vllm. (#35103)
Co-authored-by: Claude <[email protected]>
* Deprecate Java 8 (#35064)
* Deprecate Java 8
* Java 8 client now using Java 11 SDK container
* adjust non LTS fallback to use the next LTS instead of the nearest LTS.
Previously Java18 falls back to Java17, which won't work
* Emit warning when Java 8 is used. Java8 is still
supported until Beam 3.0
* Clean up subproject build file requiring Java9+
* Require java 11 to build SDK container
* fix workflow
* Simplify XVR test workflow
* Fix Samza PVR
* Remove beam college banners (#35123)
* feat: change text (#35130)
Co-authored-by: Bulat Safiullin <[email protected]>
* Update Custom Remote Inference example to use RemoteModelHandler (#35066)
* Update Custom Remote Inference example to use RemoteModelHandler
* restore old kernel metadata
* Update examples/notebooks/beam-ml/custom_remote_inference.ipynb
Co-authored-by: Danny McCormick <[email protected]>
* Add DLQ
---------
Co-authored-by: Danny McCormick <[email protected]>
* Remove Java 8 container (#35125)
* add extra_transforms block documentation to chain transform documentation
(#35101)
* add note about testing (#35075)
* fix whitespaces/newlines
* fixes per
https://github.com/apache/beam/pull/35137#pullrequestreview-2906007316
* implement pre_finalize_windowed and finalize_windowed_write in
FileBasedSink
handle file naming based on windowing for streaming in internal methods
TextIO - expose the new filebasedsink capabilities to write in streaming
* update changes.md
* fix formatting
* space formatting
* space nagging
* keep in sync with refactor in https://github.com/apache/beam/pull/35137
* keep in sync with iobase changes in
https://github.com/apache/beam/pull/35137
* [Website] update akvelon case study: update text and fix landing page
(#35133)
* feat: change text
* fix: add learn more to quotes Akvelon
---------
Co-authored-by: Bulat Safiullin <[email protected]>
* Fix PostCommit Python Xlang IO Dataflow job (#35131)
* Add support for iterable type
* Fix formatting
* Bump google.golang.org/grpc from 1.72.0 to 1.72.2 in /sdks (#35113)
Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.72.0
to 1.72.2.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.72.0...v1.72.2)
---
updated-dependencies:
- dependency-name: google.golang.org/grpc
dependency-version: 1.72.2
dependency-type: direct:production
update-type: version-update:semver-patch
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* Bump cloud.google.com/go/bigquery from 1.67.0 to 1.69.0 in /sdks (#35061)
Bumps
[cloud.google.com/go/bigquery](https://github.com/googleapis/google-cloud-go)
from 1.67.0 to 1.69.0.
- [Release notes](https://github.com/googleapis/google-cloud-go/releases)
-
[Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md)
-
[Commits](https://github.com/googleapis/google-cloud-go/compare/spanner/v1.67.0...spanner/v1.69.0)
---
updated-dependencies:
- dependency-name: cloud.google.com/go/bigquery
dependency-version: 1.69.0
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* Add known issues. (#35138)
* Add known issues.
* Add fixes notes.
---------
Co-authored-by: Claude <[email protected]>
* Bump @octokit/plugin-paginate-rest and @octokit/rest (#34167)
Bumps
[@octokit/plugin-paginate-rest](https://github.com/octokit/plugin-paginate-rest.js)
to 11.4.3 and updates ancestor dependency
[@octokit/rest](https://github.com/octokit/rest.js). These dependencies need to
be updated together.
Updates `@octokit/plugin-paginate-rest` from 2.17.0 to 11.4.3
- [Release
notes](https://github.com/octokit/plugin-paginate-rest.js/releases)
-
[Commits](https://github.com/octokit/plugin-paginate-rest.js/compare/v2.17.0...v11.4.3)
Updates `@octokit/rest` from 18.12.0 to 21.1.1
- [Release notes](https://github.com/octokit/rest.js/releases)
- [Commits](https://github.com/octokit/rest.js/compare/v18.12.0...v21.1.1)
---
updated-dependencies:
- dependency-name: "@octokit/plugin-paginate-rest"
dependency-type: indirect
- dependency-name: "@octokit/rest"
dependency-type: direct:production
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* Explicitly handle singleton iterators instead of using helper and
catching exceptions which may be from generating iterable (#35124)
* Build last snapshot against RC00 tag instead of release branch (#35142)
The last snapshot Beam Java SDK (aka "RC00" release) build is triggered
manually, to verify a RC1 build will be successful.
It has been built against release-2.xx branch, where the Dataflow container
tag replaced from beam-master to the 2.xx.0
However, the versioned containers are not yet released, causing a timing
gap that Beam 2.xx.0-SNAPSHOT won't work on Dataflow between release branch cut
and RC1 rolled out to Dataflow
Since we now have a v2.xx.0-RC00 tag, build RC00 against this tag resolves
the issue.
* Bump nodemailer from 6.7.5 to 6.9.9 in /scripts/ci/issue-report (#35143)
Bumps [nodemailer](https://github.com/nodemailer/nodemailer) from 6.7.5 to
6.9.9.
- [Release notes](https://github.com/nodemailer/nodemailer/releases)
-
[Changelog](https://github.com/nodemailer/nodemailer/blob/master/CHANGELOG.md)
-
[Commits](https://github.com/nodemailer/nodemailer/compare/v6.7.5...v6.9.9)
---
updated-dependencies:
- dependency-name: nodemailer
dependency-version: 6.9.9
dependency-type: direct:production
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* Fix tests affected by Java 8 container turn down (#35145)
* Fix tests affected by Java 8 container turn down
* still use Java 8 for Samza runner
* Bump github.com/aws/aws-sdk-go-v2/service/s3 in /sdks (#35146)
Bumps
[github.com/aws/aws-sdk-go-v2/service/s3](https://github.com/aws/aws-sdk-go-v2)
from 1.79.3 to 1.80.0.
- [Release notes](https://github.com/aws/aws-sdk-go-v2/releases)
-
[Changelog](https://github.com/aws/aws-sdk-go-v2/blob/main/changelog-template.json)
-
[Commits](https://github.com/aws/aws-sdk-go-v2/compare/service/s3/v1.79.3...service/s3/v1.80.0)
---
updated-dependencies:
- dependency-name: github.com/aws/aws-sdk-go-v2/service/s3
dependency-version: 1.80.0
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* fix jdbc transform validation (#35141)
* fix jdbc transform validation
* add test
* annotations
* spotless
* Fix Java Example ARM PostCommit
* fix: add missed word (#35163)
Co-authored-by: Bulat Safiullin <[email protected]>
* Add postcommit yaml xlang workflow and split tests accordingly (#35119)
* move precommit yaml xlang to postcommit
* add json file to trigger path
* update pull request targets
* add readme workflow changes
* add cloud sdk setup
* switch out to ubuntu-latest
* add back precommit workflow
* switch names and add postcommit
* switch out to postCommitYamlIT
* add test_files_dir flag
* add conftest.py file for capturing directory flag
* shift yaml tests around to appropriate locations
* add back precommit to readme
* add license for conftest.py
* revert precommit to previous name
* remove github.event.comment.body trigger
* Replace usages of deprecated pkg_resources package (#35153)
* Remove usages of deprecated pkg_resources package
* use stdlib importlib.resources
* remove extra comma
* linting
* import order
* Improve error message when accidentally using PBegin/Pipeline (#35156)
* Create test
* Implement new error message
* Add beam.Create into unit test pipeline
* add friendly error message for when transform is applied to no output
(#35160)
* add friendly error message for when transform is applied to no output
* update test name
* Fix pubsub unit tests that depend on old behavior
* Add warning if temp location bucket has soft delete enabled for Go SD…
(#34996)
* Add warning if temp location bucket has soft delete enabled for Go SDK
(resolves #31606)
* Corrected Formatting
* Applied suggested changes
* Constrain DequeCoder type correctly, as it does not support nulls
The DequeCoder uses ArrayDeque internally, which disallows null elements.
We could switch Deque implementations, but this change is better. To quote
the
JDK docs: "While Deque implementations are not strictly required to prohibit
the insertion of null elements, they are strongly encouraged to do so.
Users of
any Deque implementations that do allow null elements are strongly
encouraged
not to take advantage of the ability to insert nulls. This is so because
null
is used as a special return value by various methods to indicated that the
deque is empty."
* Do not overwrite class states if a cached dynamic class is returned in
cloudpickle.load (#35063)
* Fix class states overwritten after cloudpickle.load
* Fix further
* Fix lint
* Make SDK harness change effective on Iceberg Dataflow test (#35173)
* Fix beam_PostCommit_Java_Examples_Dataflow_V2 (#35172)
* [YAML]: Update postgres IT test and readme (#35169)
* update postgres test without driver_class_name
* update readme on how to run integration tests
* fix misspelling
* fix trailing whitespace
* Bump Java beam-master container (#35170)
* Make WindowedValue a public interface
The following mostly-automated changes:
- Moved WindowedValue from util to values package
- Make WindowedValue an interface with companion class WindowedValues
* Run integration tests for moving WindowedValue and making public
* Add timer tests to make sure event-time timer firing at the right time.
(#35109)
* Add timer tests to make sure event-time timer firing at the right time.
* Add more tests.
* Disable the failed event-time timer tests for FnApiRunner.
* Fix lints and reformat.
* Disable another new test in FnApiRunnerTest and PortableRunnerTest due to
flakiness.
* Disable a new test in FlinkRunnerTest
* Take out the early firing test case because it depends on bundle size.
* change to ubuntu-20.04 (#35182)
* Fix IntelliJ sync project failure due to circular Beam dependency (#35167)
* Fix IntelliJ sync project failure due to circular Beam dependency
* address comments
* Update workflows categories (#35162)
* Add cloudpickle coder. (#35166)
* Move examples from sql package (#35183)
* Fix the beam interactive install problem when on Google Colab (#35148)
* Fix Google Colab Issue
* Update CHANGES.md
* Bump github.com/docker/docker in /sdks (#35112)
Bumps [github.com/docker/docker](https://github.com/docker/docker) from
28.0.4+incompatible to 28.2.2+incompatible.
- [Release notes](https://github.com/docker/docker/releases)
- [Commits](https://github.com/docker/docker/compare/v28.0.4...v28.2.2)
---
updated-dependencies:
- dependency-name: github.com/docker/docker
dependency-version: 28.2.2+incompatible
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* Bump github.com/nats-io/nats-server/v2 from 2.11.3 to 2.11.4 in /sdks
(#35161)
Bumps
[github.com/nats-io/nats-server/v2](https://github.com/nats-io/nats-server)
from 2.11.3 to 2.11.4.
- [Release notes](https://github.com/nats-io/nats-server/releases)
-
[Changelog](https://github.com/nats-io/nats-server/blob/main/.goreleaser.yml)
-
[Commits](https://github.com/nats-io/nats-server/compare/v2.11.3...v2.11.4)
---
updated-dependencies:
- dependency-name: github.com/nats-io/nats-server/v2
dependency-version: 2.11.4
dependency-type: direct:production
update-type: version-update:semver-patch
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* Touch JPMS test trigger file
* Use staged SDK harness & Dataflow worker jar in JPMS tests
* Bump cloud.google.com/go/storage from 1.52.0 to 1.55.0 in /sdks (#35114)
Bumps
[cloud.google.com/go/storage](https://github.com/googleapis/google-cloud-go)
from 1.52.0 to 1.55.0.
- [Release notes](https://github.com/googleapis/google-cloud-go/releases)
-
[Changelog](https://github.com/googleapis/google-cloud-go/blob/main/CHANGES.md)
-
[Commits](https://github.com/googleapis/google-cloud-go/compare/spanner/v1.52.0...spanner/v1.55.0)
---
updated-dependencies:
- dependency-name: cloud.google.com/go/storage
dependency-version: 1.55.0
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* Bump github.com/nats-io/nats.go from 1.42.0 to 1.43.0 in /sdks (#35147)
Bumps [github.com/nats-io/nats.go](https://github.com/nats-io/nats.go) from
1.42.0 to 1.43.0.
- [Release notes](https://github.com/nats-io/nats.go/releases)
- [Commits](https://github.com/nats-io/nats.go/compare/v1.42.0...v1.43.0)
---
updated-dependencies:
- dependency-name: github.com/nats-io/nats.go
dependency-version: 1.43.0
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* Move non-dill specific code out of dill_pickler.py (#35139)
* Create consistent_pickle.py
* Update dill_pickler.py
* Update consistent_pickle.py
* Add Apache license to consistent_pickle.py
* Update dill_pickler.py
* Update and rename consistent_pickle.py to code_object_pickler.py
* Update dill_pickler.py to change consistent_pickle to code_object_pickler
* Fix formatting issue in code_object_pickler.py
* Fix formatting issue in dill_pickler.py
* Remove apache license from code_object_pickler.py
* Add apache license to code_object_pickler.py
* Fix some lints introduced in a recent PR. (#35193)
* small filesystem fixes (#34956)
* Fix typehint for get_filesystem
* Fix bug with creating files in cwd with LocalFileSystem
* lint
* revert precommit config change
* isort
* add file to start PR builder
* Revert "add file to start PR builder"
This reverts commit f80b361d80ad5a386333bd012ce24839ddf5fdf6.
* fix isort again
* [YAML] Add a spec provider for transforms taking specifiable arguments
(#35187)
* Add a test provider for specifiable and try it on AnomalyDetection.
Also add support on callable in spec.
* Minor renaming
* Fix lints.
* Touch trigger files to test WindowedValueReceiver in runners
* Introduce WindowedValue receivers and consolidate runner code to them
We need a receiver for WindowedValue to create an OutputBuilder. This change
introduces the interface and uses it in many places where it is appropriate,
replacing and simplifying internal runner code.
* Eliminate nullness errors from ByteBuddyDoFnInvokerFactory and
DoFnOutputReceivers
* Fix null check when fetching driverJars from value provider
* Fix PostCommit Python ValidatesRunner Samza / Spark jobs (#35210)
* Skip Samza and Spark runner tests
* Fix formatting
* Update pypi documentation 30145 (#34329)
* Add Pypi friendly documentation
* provided full link path
* Add more YAML examples involving Kafka and Iceberg (#35151)
* Add more YAML examples involving Kafka and Iceberg
* Fix some missed details from rebasing
* Adding unit tests for YAML examples
* Clean up and address PR comments
* Formatting
* Formatting
* Evaluate namedTuples as equivalent to rows (#35218)
* Evaluate namedTuples as equivalent to rows
* lint
* Add a new experiment flag to enable real-time clock as processing time.
(#35202)
* Touch trigger files for lightweight runners
* Eliminate WindowedValue.getPane() in preparation for making it a
user-facing interface
* Do not fail if there were failed deletes (#35222)
* Force FnApiRunner in cases where prism can't handle use case (#35219)
* Force FnApiRunner in cases where prism can't handle use case
* yapf
* Bump golang.org/x/net from 0.40.0 to 0.41.0 in /sdks (#35206)
Bumps [golang.org/x/net](https://github.com/golang/net) from 0.40.0 to
0.41.0.
- [Commits](https://github.com/golang/net/compare/v0.40.0...v0.41.0)
---
updated-dependencies:
- dependency-name: golang.org/x/net
dependency-version: 0.41.0
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* Fix incorrect typehints generated by FlatMap with default identity
function (#35164)
* create unit test
* add some unit tests
* fix bug where T is considered iterable
* update strip_iterable to return Any for "stripped iterable" type of
TypeVariable
* remove typehint from identity function and add a test to test for proper
typechecking
* Move callablewrapp typehint test
* Remove print
* isort
* isort
* return any for yielded type of T
* Parse values returned from Dataflow API to BoundedTrieData (#34738)
* Parse struct returned from Dataflow API to BoundedTrieData
fix checkstyle
Use getBoundedTrie
add debug log
adapt to ArrayMap
* Add test, clean up
* Fix test: setTrie -> setBoundedTrie
* Fix comments
* Remove breaking PDone change (#35224)
* Remove breaking PDone change
* Update pipeline.py
* Update pipeline.py
* Update pipeline_test.py
* Move none check
* yapf
* fmt
* Generic Postgres + Cloudsql postgres embeddings. (#35215)
* Add base Postgres vector writer, CloudSQL vector writer and refactor.
* Trigger tests.
* Linter fixes.
* Fix test
* Add back tests. Update changes.md. Fix unrelated lint.
* Drop test tables.
* Fix test
* Fix tests.
* Fix test.
* Fix tests.
---------
Co-authored-by: Claude <[email protected]>
* Allow only one thread at a time to start the VLLM server. (#35234)
* [IcebergIO] Create namespaces if needed (#35228)
* create namespaces dynamically
* cleanup namespaces in ITs
* optimization
* Support configuring flush_count and max_row_bytes of WriteToBigTable
(#34761)
* Update CHANGES.md (#35242)
Highlight improvements to the vllm model handler.
* [Beam SQL] Implement Catalog and CatalogManager (#35223)
* beam sql catalog
* api adjustment
* cover more naming syntax (quotes, backticks, none)
* spotless
* fix
* add documentation and cleanup
* rename to dropCatalog; mark BeamSqlCli @Internal; rename to
EmptyCatalogManager
* use registrars instead; remove initialize() method from Catalog
* cleanupo
* [IcebergIO] Fix conversion logic for arrays of structs and maps of
structs; fix output Schema resolution with column pruning (#35230)
* fix complex types; filx filter schema logic
* update Changes
* add missing changes from other PRs
* trigger ITs
* make separate impl for iterable
* fix
* fix long_description when the md file cannot be found (#35246)
* fix long_description when the md file cannot be found
* yapf
* [IcebergIO] Create tables with a partition spec (#34966)
* create table with partition spec
* add description and regenerate config doc; trigger tests
* spotless
* log partition spec
* fixes
* Fix typo java-dependencies.md (#35251)
* Adding project and database support in write transform for firestoreIO
(#35017)
* Adding project and database support in write transform for firestoreIO
* Spotless Apply
* Resolving comments
* Removing useless comments
* public to private
* Spotless apply
* Fix a logical type issue about JdbcDateType and JdbcTimeType (#35243)
* Fix a logical type issue about JdbcDateType
* Fix typo and also fix the logical class for java time.
* Get rid of the workaround on logical type registration. Trigger tests.
* Fix lints.
* Remove internal code. (#35239)
Co-authored-by: Claude <[email protected]>
* enable setting max_writer_per_bundle for avroIO and other IO (#35092)
* enable setting max_writer_per_bundle for avroIO and other IO
* enable setting max_writer_per_bundle for avroIO and other IO
* enable for TFRecordIO and corrections
* Updated standard_external_transforms.yaml
* [Java FnApi] Fix race in BeamFnStatusClient by initializing all fields
before starting rpc. (#35252)
* [Java FnApi] Fix race in BeamFnStatusClient by initializing all fields
before starting rpc.
* spotless
* [IcebergIO] Integrate with Beam SQL (#34799)
* add Iceberg table provider and tests
* properties go in the tableprovider initialization
* tobuilder
* streaming integration test
* spotless
* extend test to include multi nested types; fix iceberg <-> conversion
logic
* add to changes.md
* spotless
* fix tests
* clean
* update CHANGES
* add projection pushdown and column pruning
* spotless
* fixes
* fixes
* updates
* sync with HEAD and use new Catalog implementation
* mark new interfaces @internal
* spotless
* fix unparse method
* update changes (#35256)
* [YAML]: fix postcommit oracle bug and reorganize postcommit tests (#35191)
* fix postcommit oracle test
* add revision
* switch to hosted runner to try with kafka test
* add extended timeout
* upgrade to 4.10 testcontainers
* switch out to redpanda for kafka
* remove redpandacontainer
* tmp comment
* add postgres comment
* revert to old kafkaContainer
* revert commented out code and revert testcontainer version change
* change mysql image version
* revert image change
* revert image change again :)
* add comments to mysql again
* shift post commit files to different folders
* rename to Data version
* add databases version
* add messaging version
* update readme for three post commits
* update gradle with new post commits
* fix job names
* uncomment fixture on mysql
* switch back to one workflow and update readme as such
* remove old workflow files
* update order and remove comment
* update gradle with parameterized options
* update gradle command calls to correct location
* update workflow to three jobs explicit
* add back Bigquery deselect
* fix mysql teardown error
* Simplify down to one from three explicit jobs
* remove tab
* remove Data
* fix parsing parameters
* update hadoop prefix (#35257)
* Set go version 1.24.4 (#35261)
* Set go version 1.24.4
* Set go version 1.24.4
* upgrade org.apache.parquet:parquet-avro to to 1.15.2 (#35037)
* upgrade org.apache.parquet:parquet-avro to to 1.15.2
* keep iceberg to 1.6.1
* use 1.9.0 for iceberg
* run all iceberg tests
* require Java 11
* fixed java11 for iceberg
* switched back to iceberg_version = "1.6.1"
* merged master
---------
Co-authored-by: Vitaly Terentyev <[email protected]>
* Bump google.golang.org/grpc from 1.72.2 to 1.73.0 in /sdks (#35236)
Bumps [google.golang.org/grpc](https://github.com/grpc/grpc-go) from 1.72.2
to 1.73.0.
- [Release notes](https://github.com/grpc/grpc-go/releases)
- [Commits](https://github.com/grpc/grpc-go/compare/v1.72.2...v1.73.0)
---
updated-dependencies:
- dependency-name: google.golang.org/grpc
dependency-version: 1.73.0
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* add more doc strings for integration tests (#35171)
* [IcebergIO] extend table partitioning to Beam SQL (#35268)
* add sql partitioning
* trigger ITs and add to CHANGES
* simplify test
* include emoty namespace fix
* add error when using partitioning for unsupported tables
* fix test
* Moving to 2.67.0-SNAPSHOT on master branch.
* Update CHANGES.md to have fields for 2.67.0 release
* add free disk space step (#35260)
* [yaml]: Fix post commit yaml once more (#35273)
* switch back to self hosted runner and comment out most of kafka test for
now
* add git issue to track
* revision
* remove free space step - seems to be causing more issues than helping
* Polish anomaly detection notebook and get ready to be imported in public
devsite. (#35278)
* Polish anomaly detection zscore notebook for public doc.
* Adjust formatting.
* Adjust formatting.
* Suppress Findbugs (#35274)
* Support Java 17+ compiled Beam components for build, release, and xlang
tests (#35232)
* Install JDK 21 in release build
Support Beam components requiring Java17+ for release workflows
They will be compiled with JDK21 with byte code compatibility
configured by applyJavaNature(requireJavaVersion)
* Use Java 21 for Python PostCommit
* Honor JAVA_HOME in JavaJarServer checks and tests
* Disable Debezium test on Java17-
* add example line
* [IO] Update Debezium in DebeziumIO to 3.1.1 (#34763)
* Updating Debezium IO to 3.1.1
Enforce JDK17 in build.gradle of Debezium IO
* adjust to review comments by @Abacn
* cleanup
* mention which PR needs to merge before unpinning
* Mention Beam version 2.66 in the README for Debezium
---------
Co-authored-by: Shunping Huang <[email protected]>
* Document BQ Storage API pipeline options (#35259)
* Document BQ Storage API pipeline options
* Fix whitespace
* Fix whitespace
* Speed up StreamingDataflowWorkerTest by removing 10 second wait from
shutdown path. (#35275)
Takes test time from 5m18s to 2m40s.
The previous shutdown() call doesn't do anything since that just means
future scheduling won't trigger but we only schedule on the executor
once.
Also cleanup test logs by making sure to stop all workers we start so they
don't continue to run in the background and log.
This shutdown paths is only used in testing.
* Bump org.ajoberstar.grgit:grgit-gradle from 4.1.1 to 5.3.2 (#35301)
Bumps
[org.ajoberstar.grgit:grgit-gradle](https://github.com/ajoberstar/grgit) from
4.1.1 to 5.3.2.
- [Release notes](https://github.com/ajoberstar/grgit/releases)
- [Commits](https://github.com/ajoberstar/grgit/compare/4.1.1...5.3.2)
---
updated-dependencies:
- dependency-name: org.ajoberstar.grgit:grgit-gradle
dependency-version: 5.3.2
dependency-type: direct:production
update-type: version-update:semver-major
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* Bump google.golang.org/api from 0.235.0 to 0.237.0 in /sdks (#35302)
Bumps
[google.golang.org/api](https://github.com/googleapis/google-api-go-client)
from 0.235.0 to 0.237.0.
- [Release
notes](https://github.com/googleapis/google-api-go-client/releases)
-
[Changelog](https://github.com/googleapis/google-api-go-client/blob/main/CHANGES.md)
-
[Commits](https://github.com/googleapis/google-api-go-client/compare/v0.235.0...v0.237.0)
---
updated-dependencies:
- dependency-name: google.golang.org/api
dependency-version: 0.237.0
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* Bump github.com/linkedin/goavro/v2 from 2.13.1 to 2.14.0 in /sdks (#35205)
Bumps [github.com/linkedin/goavro/v2](https://github.com/linkedin/goavro)
from 2.13.1 to 2.14.0.
- [Release notes](https://github.com/linkedin/goavro/releases)
-
[Changelog](https://github.com/linkedin/goavro/blob/master/debug_release.go)
- [Commits](https://github.com/linkedin/goavro/compare/v2.13.1...v2.14.0)
---
updated-dependencies:
- dependency-name: github.com/linkedin/goavro/v2
dependency-version: 2.14.0
dependency-type: direct:production
update-type: version-update:semver-minor
...
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
* Update spotbugs version, fix runner ubuntu version, fix found spotbugs
issues (#35303)
* feat: Add option to control resource cleanup failure for IT (#35287)
* Revert "Bump org.ajoberstar.grgit:grgit-gradle from 4.1.1 to 5.3.2
(#35301)" (#35305)
This reverts commit 0fd77ced0e5efba37ddc56d81ade38ef6756bb7e.
* Add PeriodicStream in the new time series folder. (#35300)
* Add PeriodicStream in the new time series folder.
* Add some more docstrings and minor fix on test name.
* Fix lints and docs.
* try buildah to replace kaniko (#35289)
* try buildah to replace kaniko
* trigger post-commit
* Adding error handler for SpannerReadSchemaTransformProvider and missi…
(#35241)
* Adding error handler for SpannerReadSchemaTransformProvider and missing
tests for SpannerSchemaTransformProvider
* Removed not used logging
* Spotless Apply
* Spotless Apply
* Spotless Apply
* Typo correction
* requests vulnerability. (#35308)
* [IcebergIO] create custom java container image for tests (#35307)
* create custom java container image for tests
* syntax
* eval depends on df
* add streaming support to iobase (python) (#35137)
* Add support for streaming writes in IOBase (Python)
* add triggering_frequency in iobase.Sink
* fix whitespaces/newlines
* fixes per
https://github.com/apache/beam/pull/35137#pullrequestreview-2906007316
* refactor for code redability
* refactor _expand_unbounded , default num_shards to 1 , if undef or 0
* fix formatter
* space
* keep num_shards = 0 the same as before for bounded write
* add streaming to AvroIO, ParquetIO, TFRecordsIO
* reformat
* typo and spaces
* carry on the refactor from #35253
---------
Signed-off-by: dependabot[bot] <[email protected]>
Co-authored-by: bullet03 <[email protected]>
Co-authored-by: Bulat Safiullin <[email protected]>
Co-authored-by: liferoad <[email protected]>
Co-authored-by: claudevdm <[email protected]>
Co-authored-by: Claude <[email protected]>
Co-authored-by: Yi Hu <[email protected]>
Co-authored-by: Danny McCormick <[email protected]>
Co-authored-by: Jack McCluskey
<[email protected]>
Co-authored-by: Derrick Williams <[email protected]>
Co-authored-by: Vitaly Terentyev <[email protected]>
Co-authored-by: dependabot[bot]
<49699333+dependabot[bot]@users.noreply.github.com>
Co-authored-by: scwhittle <[email protected]>
Co-authored-by: Radosław Stankiewicz <[email protected]>
Co-authored-by: Hai Joey Tran <[email protected]>
Co-authored-by: Tanu Sharma
<[email protected]>
Co-authored-by: Kenneth Knowles <[email protected]>
Co-authored-by: Minbo Bae <[email protected]>
Co-authored-by: Shunping Huang <[email protected]>
Co-authored-by: Chenzo <[email protected]>
Co-authored-by: kristynsmith <[email protected]>
Co-authored-by: Rakesh Kumar <[email protected]>
Co-authored-by: Charles Nguyen <[email protected]>
Co-authored-by: tvalentyn <[email protected]>
Co-authored-by: Ahmed Abualsaud
<[email protected]>
Co-authored-by: Minh Son Nguyen <[email protected]>
Co-authored-by: Amar3tto <actions@GitHub Actions 1000279405.local>
Co-authored-by: Vitaly Terentyev <[email protected]>
Co-authored-by: Tobias Kaymak <[email protected]>
Co-authored-by: Veronica Wasson
<[email protected]>
---
CHANGES.md | 4 +
sdks/python/apache_beam/io/avroio.py | 60 +++--
sdks/python/apache_beam/io/avroio_test.py | 274 ++++++++++++++++++++++
sdks/python/apache_beam/io/iobase_it_test.py | 72 ++++++
sdks/python/apache_beam/io/parquetio.py | 102 +++++++--
sdks/python/apache_beam/io/parquetio_it_test.py | 41 ++++
sdks/python/apache_beam/io/parquetio_test.py | 290 ++++++++++++++++++++++++
sdks/python/apache_beam/io/tfrecordio.py | 46 +++-
sdks/python/apache_beam/io/tfrecordio_test.py | 259 +++++++++++++++++++++
9 files changed, 1103 insertions(+), 45 deletions(-)
diff --git a/CHANGES.md b/CHANGES.md
index 57db95312b9..bffa326c9ff 100644
--- a/CHANGES.md
+++ b/CHANGES.md
@@ -108,8 +108,12 @@
* Support for X source added (Java/Python)
([#X](https://github.com/apache/beam/issues/X)).
* Debezium IO upgraded to 3.1.1 requires Java 17 (Java)
([#34747](https://github.com/apache/beam/issues/34747)).
* Add support for streaming writes in IOBase (Python)
+* Add IT test for streaming writes for IOBase (Python)
* Implement support for streaming writes in FileBasedSink (Python)
+* Expose support for streaming writes in AvroIO (Python)
+* Expose support for streaming writes in ParquetIO (Python)
* Expose support for streaming writes in TextIO (Python)
+* Expose support for streaming writes in TFRecordsIO (Python)
## New Features / Improvements
diff --git a/sdks/python/apache_beam/io/avroio.py
b/sdks/python/apache_beam/io/avroio.py
index 553b6c741f3..da904bf6fb5 100644
--- a/sdks/python/apache_beam/io/avroio.py
+++ b/sdks/python/apache_beam/io/avroio.py
@@ -354,8 +354,7 @@ class _FastAvroSource(filebasedsource.FileBasedSource):
while range_tracker.try_claim(next_block_start):
block = next(blocks)
next_block_start = block.offset + block.size
- for record in block:
- yield record
+ yield from block
_create_avro_source = _FastAvroSource
@@ -375,7 +374,8 @@ class WriteToAvro(beam.transforms.PTransform):
num_shards=0,
shard_name_template=None,
mime_type='application/x-avro',
- use_fastavro=True):
+ use_fastavro=True,
+ triggering_frequency=None):
"""Initialize a WriteToAvro transform.
Args:
@@ -393,17 +393,30 @@ class WriteToAvro(beam.transforms.PTransform):
Constraining the number of shards is likely to reduce
the performance of a pipeline. Setting this value is not recommended
unless you require a specific number of output files.
+ In streaming if not set, the service will write a file per bundle.
shard_name_template: A template string containing placeholders for
- the shard number and shard count. When constructing a filename for a
- particular shard number, the upper-case letters 'S' and 'N' are
- replaced with the 0-padded shard number and shard count respectively.
- This argument can be '' in which case it behaves as if num_shards was
- set to 1 and only one file will be generated. The default pattern used
- is '-SSSSS-of-NNNNN' if None is passed as the shard_name_template.
+ the shard number and shard count. Currently only ``''``,
+ ``'-SSSSS-of-NNNNN'``, ``'-W-SSSSS-of-NNNNN'`` and
+ ``'-V-SSSSS-of-NNNNN'`` are patterns accepted by the service.
+ When constructing a filename for a particular shard number, the
+ upper-case letters ``S`` and ``N`` are replaced with the ``0``-padded
+ shard number and shard count respectively. This argument can be ``''``
+ in which case it behaves as if num_shards was set to 1 and only one
file
+ will be generated. The default pattern used is ``'-SSSSS-of-NNNNN'``
for
+ bounded PCollections and for ``'-W-SSSSS-of-NNNNN'`` unbounded
+ PCollections.
+ W is used for windowed shard naming and is replaced with
+ ``[window.start, window.end)``
+ V is used for windowed shard naming and is replaced with
+ ``[window.start.to_utc_datetime().strftime("%Y-%m-%dT%H-%M-%S"),
+ window.end.to_utc_datetime().strftime("%Y-%m-%dT%H-%M-%S")``
mime_type: The MIME type to use for the produced files, if the filesystem
supports specifying MIME types.
use_fastavro (bool): This flag is left for API backwards compatibility
and no longer has an effect. Do not use.
+ triggering_frequency: (int) Every triggering_frequency duration, a
window
+ will be triggered and all bundles in the window will be written.
+ If set it overrides user windowing. Mandatory for GlobalWindow.
Returns:
A WriteToAvro transform usable for writing.
@@ -411,7 +424,7 @@ class WriteToAvro(beam.transforms.PTransform):
self._schema = schema
self._sink_provider = lambda avro_schema: _create_avro_sink(
file_path_prefix, avro_schema, codec, file_name_suffix, num_shards,
- shard_name_template, mime_type)
+ shard_name_template, mime_type, triggering_frequency)
def expand(self, pcoll):
if self._schema:
@@ -428,6 +441,15 @@ class WriteToAvro(beam.transforms.PTransform):
records = pcoll | beam.Map(
beam_row_to_avro_dict(avro_schema, beam_schema))
self._sink = self._sink_provider(avro_schema)
+ if (not pcoll.is_bounded and self._sink.shard_name_template
+ == filebasedsink.DEFAULT_SHARD_NAME_TEMPLATE):
+ self._sink.shard_name_template = (
+ filebasedsink.DEFAULT_WINDOW_SHARD_NAME_TEMPLATE)
+ self._sink.shard_name_format = self._sink._template_to_format(
+ self._sink.shard_name_template)
+ self._sink.shard_name_glob_format = self._sink._template_to_glob_format(
+ self._sink.shard_name_template)
+
return records | beam.io.iobase.Write(self._sink)
def display_data(self):
@@ -441,7 +463,8 @@ def _create_avro_sink(
file_name_suffix,
num_shards,
shard_name_template,
- mime_type):
+ mime_type,
+ triggering_frequency=60):
if "class 'avro.schema" in str(type(schema)):
raise ValueError(
'You are using Avro IO with fastavro (default with Beam on '
@@ -454,7 +477,8 @@ def _create_avro_sink(
file_name_suffix,
num_shards,
shard_name_template,
- mime_type)
+ mime_type,
+ triggering_frequency)
class _BaseAvroSink(filebasedsink.FileBasedSink):
@@ -467,7 +491,8 @@ class _BaseAvroSink(filebasedsink.FileBasedSink):
file_name_suffix,
num_shards,
shard_name_template,
- mime_type):
+ mime_type,
+ triggering_frequency):
super().__init__(
file_path_prefix,
file_name_suffix=file_name_suffix,
@@ -477,7 +502,8 @@ class _BaseAvroSink(filebasedsink.FileBasedSink):
mime_type=mime_type,
# Compression happens at the block level using the supplied codec, and
# not at the file level.
- compression_type=CompressionTypes.UNCOMPRESSED)
+ compression_type=CompressionTypes.UNCOMPRESSED,
+ triggering_frequency=triggering_frequency)
self._schema = schema
self._codec = codec
@@ -498,7 +524,8 @@ class _FastAvroSink(_BaseAvroSink):
file_name_suffix,
num_shards,
shard_name_template,
- mime_type):
+ mime_type,
+ triggering_frequency):
super().__init__(
file_path_prefix,
schema,
@@ -506,7 +533,8 @@ class _FastAvroSink(_BaseAvroSink):
file_name_suffix,
num_shards,
shard_name_template,
- mime_type)
+ mime_type,
+ triggering_frequency)
self.file_handle = None
def open(self, temp_path):
diff --git a/sdks/python/apache_beam/io/avroio_test.py
b/sdks/python/apache_beam/io/avroio_test.py
index 6dd9e620c66..6669b6fb8ab 100644
--- a/sdks/python/apache_beam/io/avroio_test.py
+++ b/sdks/python/apache_beam/io/avroio_test.py
@@ -16,11 +16,15 @@
#
# pytype: skip-file
+import glob
import json
import logging
import math
import os
+import pytz
import pytest
+import re
+import shutil
import tempfile
import unittest
from typing import List, Any
@@ -47,14 +51,17 @@ from apache_beam.io.avroio import _create_avro_sink # For
testing
from apache_beam.io.filesystems import FileSystems
from apache_beam.options.pipeline_options import StandardOptions
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
from apache_beam.transforms.sql import SqlTransform
from apache_beam.transforms.userstate import CombiningValueStateSpec
+from apache_beam.transforms.util import LogElements
from apache_beam.utils.timestamp import Timestamp
from apache_beam.typehints import schemas
+from datetime import datetime
# Import snappy optionally; some tests will be skipped when import fails.
try:
@@ -673,6 +680,273 @@ class TestFastAvro(AvroBase, unittest.TestCase):
return f.name
+class GenerateEvent(beam.PTransform):
+ @staticmethod
+ def sample_data():
+ return GenerateEvent()
+
+ def expand(self, input):
+ elemlist = [{'age': 10}, {'age': 20}, {'age': 30}]
+ elem = elemlist
+ return (
+ input
+ | TestStream().add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 1, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 2, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 3, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 4, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ advance_watermark_to(
+ datetime(2021, 3, 1, 0, 0, 5, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 5, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 6,
+ 0, tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 7, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 8, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 9, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ advance_watermark_to(
+ datetime(2021, 3, 1, 0, 0, 10, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 10, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 11, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 12, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 13, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 14, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ advance_watermark_to(
+ datetime(2021, 3, 1, 0, 0, 15, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 15, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 16, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 17, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 18, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 19, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ advance_watermark_to(
+ datetime(2021, 3, 1, 0, 0, 20, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 20, 0,
+
tzinfo=pytz.UTC).timestamp()).advance_watermark_to(
+ datetime(
+ 2021, 3, 1, 0, 0, 25, 0, tzinfo=pytz.UTC).
+ timestamp()).advance_watermark_to_infinity())
+
+
+class WriteStreamingTest(unittest.TestCase):
+ def setUp(self):
+ super().setUp()
+ self.tempdir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ if os.path.exists(self.tempdir):
+ shutil.rmtree(self.tempdir)
+
+ def test_write_streaming_2_shards_default_shard_name_template(
+ self, num_shards=2):
+ with TestPipeline() as p:
+ output = (
+ p
+ | GenerateEvent.sample_data()
+ | 'User windowing' >> beam.transforms.core.WindowInto(
+ beam.transforms.window.FixedWindows(60),
+ trigger=beam.transforms.trigger.AfterWatermark(),
+ accumulation_mode=beam.transforms.trigger.AccumulationMode.
+ DISCARDING,
+ allowed_lateness=beam.utils.timestamp.Duration(seconds=0)))
+ #AvroIO
+ avroschema = {
+ 'name': 'dummy', # your supposed to be file name with .avro extension
+ 'type': 'record', # type of avro serilazation, there are more (see
+ # above docs)
+ 'fields': [ # this defines actual keys & their types
+ {'name': 'age', 'type': 'int'},
+ ],
+ }
+ output2 = output | 'WriteToAvro' >> beam.io.WriteToAvro(
+ file_path_prefix=self.tempdir + "/ouput_WriteToAvro",
+ file_name_suffix=".avro",
+ num_shards=num_shards,
+ schema=avroschema)
+ _ = output2 | 'LogElements after WriteToAvro' >> LogElements(
+ prefix='after WriteToAvro ', with_window=True, level=logging.INFO)
+
+ # Regex to match the expected windowed file pattern
+ # Example:
+ # ouput_WriteToAvro-[1614556800.0, 1614556805.0)-00000-of-00002.avro
+ # It captures: window_interval, shard_num, total_shards
+ pattern_string = (
+ r'.*-\[(?P<window_start>[\d\.]+), '
+ r'(?P<window_end>[\d\.]+|Infinity)\)-'
+ r'(?P<shard_num>\d{5})-of-(?P<total_shards>\d{5})\.avro$')
+ pattern = re.compile(pattern_string)
+ file_names = []
+ for file_name in glob.glob(self.tempdir + '/ouput_WriteToAvro*'):
+ match = pattern.match(file_name)
+ self.assertIsNotNone(
+ match, f"File name {file_name} did not match expected pattern.")
+ if match:
+ file_names.append(file_name)
+ print("Found files matching expected pattern:", file_names)
+ self.assertEqual(
+ len(file_names),
+ num_shards,
+ "expected %d files, but got: %d" % (num_shards, len(file_names)))
+
+ def test_write_streaming_2_shards_custom_shard_name_template(
+ self, num_shards=2, shard_name_template='-V-SSSSS-of-NNNNN'):
+ with TestPipeline() as p:
+ output = (p | GenerateEvent.sample_data())
+ #AvroIO
+ avroschema = {
+ 'name': 'dummy', # your supposed to be file name with .avro extension
+ 'type': 'record', # type of avro serilazation
+ 'fields': [ # this defines actual keys & their types
+ {'name': 'age', 'type': 'int'},
+ ],
+ }
+ output2 = output | 'WriteToAvro' >> beam.io.WriteToAvro(
+ file_path_prefix=self.tempdir + "/ouput_WriteToAvro",
+ file_name_suffix=".avro",
+ shard_name_template=shard_name_template,
+ num_shards=num_shards,
+ triggering_frequency=60,
+ schema=avroschema)
+ _ = output2 | 'LogElements after WriteToAvro' >> LogElements(
+ prefix='after WriteToAvro ', with_window=True, level=logging.INFO)
+
+ # Regex to match the expected windowed file pattern
+ # Example:
+ # ouput_WriteToAvro-[2021-03-01T00-00-00, 2021-03-01T00-01-00)-
+ # 00000-of-00002.avro
+ # It captures: window_interval, shard_num, total_shards
+ pattern_string = (
+ r'.*-\[(?P<window_start>\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}), '
+ r'(?P<window_end>\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}|Infinity)\)-'
+ r'(?P<shard_num>\d{5})-of-(?P<total_shards>\d{5})\.avro$')
+ pattern = re.compile(pattern_string)
+ file_names = []
+ for file_name in glob.glob(self.tempdir + '/ouput_WriteToAvro*'):
+ match = pattern.match(file_name)
+ self.assertIsNotNone(
+ match, f"File name {file_name} did not match expected pattern.")
+ if match:
+ file_names.append(file_name)
+ print("Found files matching expected pattern:", file_names)
+ self.assertEqual(
+ len(file_names),
+ num_shards,
+ "expected %d files, but got: %d" % (num_shards, len(file_names)))
+
+ def test_write_streaming_2_shards_custom_shard_name_template_5s_window(
+ self,
+ num_shards=2,
+ shard_name_template='-V-SSSSS-of-NNNNN',
+ triggering_frequency=5):
+ with TestPipeline() as p:
+ output = (p | GenerateEvent.sample_data())
+ #AvroIO
+ avroschema = {
+ 'name': 'dummy', # your supposed to be file name with .avro
extension
+ 'type': 'record', # type of avro serilazation
+ 'fields': [ # this defines actual keys & their types
+ {'name': 'age', 'type': 'int'},
+ ],
+ }
+ output2 = output | 'WriteToAvro' >> beam.io.WriteToAvro(
+ file_path_prefix=self.tempdir + "/ouput_WriteToAvro",
+ file_name_suffix=".txt",
+ shard_name_template=shard_name_template,
+ num_shards=num_shards,
+ triggering_frequency=triggering_frequency,
+ schema=avroschema)
+ _ = output2 | 'LogElements after WriteToAvro' >> LogElements(
+ prefix='after WriteToAvro ', with_window=True, level=logging.INFO)
+
+ # Regex to match the expected windowed file pattern
+ # Example:
+ # ouput_WriteToAvro-[2021-03-01T00-00-00, 2021-03-01T00-01-00)-
+ # 00000-of-00002.avro
+ # It captures: window_interval, shard_num, total_shards
+ pattern_string = (
+ r'.*-\[(?P<window_start>\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}), '
+ r'(?P<window_end>\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}|Infinity)\)-'
+ r'(?P<shard_num>\d{5})-of-(?P<total_shards>\d{5})\.txt$')
+ pattern = re.compile(pattern_string)
+ file_names = []
+ for file_name in glob.glob(self.tempdir + '/ouput_WriteToAvro*'):
+ match = pattern.match(file_name)
+ self.assertIsNotNone(
+ match, f"File name {file_name} did not match expected pattern.")
+ if match:
+ file_names.append(file_name)
+ print("Found files matching expected pattern:", file_names)
+ # for 5s window size, the input should be processed by 5 windows with
+ # 2 shards per window
+ self.assertEqual(
+ len(file_names),
+ 10,
+ "expected %d files, but got: %d" % (num_shards, len(file_names)))
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
diff --git a/sdks/python/apache_beam/io/iobase_it_test.py
b/sdks/python/apache_beam/io/iobase_it_test.py
new file mode 100644
index 00000000000..acb44f4085b
--- /dev/null
+++ b/sdks/python/apache_beam/io/iobase_it_test.py
@@ -0,0 +1,72 @@
+#
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+# pytype: skip-file
+
+import logging
+import unittest
+import uuid
+
+import apache_beam as beam
+from apache_beam.io.textio import WriteToText
+from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.transforms.window import FixedWindows
+
+# End-to-End tests for iobase
+# Usage:
+# cd sdks/python
+# pip install build && python -m build --sdist
+# DataflowRunner:
+# python -m pytest -o log_cli=True -o log_level=Info \
+# apache_beam/io/iobase_it_test.py::IOBaseITTest \
+# --test-pipeline-options="--runner=TestDataflowRunner \
+# --project=apache-beam-testing --region=us-central1 \
+# --temp_location=gs://apache-beam-testing-temp/temp \
+# --sdk_location=dist/apache_beam-2.65.0.dev0.tar.gz"
+
+
+class IOBaseITTest(unittest.TestCase):
+ def setUp(self):
+ self.test_pipeline = TestPipeline(is_integration_test=True)
+ self.runner_name = type(self.test_pipeline.runner).__name__
+
+ def test_unbounded_pcoll_without_global_window(self):
+ # https://github.com/apache/beam/issues/25598
+
+ args = self.test_pipeline.get_full_options_as_args(streaming=True)
+
+ topic = 'projects/pubsub-public-data/topics/taxirides-realtime'
+ unique_id = str(uuid.uuid4())
+ output_file =
f'gs://apache-beam-testing-integration-testing/iobase/test-{unique_id}' #
pylint: disable=line-too-long
+
+ p = beam.Pipeline(argv=args)
+ # Read from Pub/Sub with fixed windowing
+ lines = (
+ p
+ | "ReadFromPubSub" >> beam.io.ReadFromPubSub(topic=topic)
+ | "WindowInto" >> beam.WindowInto(FixedWindows(10)))
+
+ # Write to text file
+ _ = lines | 'WriteToText' >> WriteToText(output_file)
+
+ result = p.run()
+ result.wait_until_finish(duration=60 * 1000)
+
+
+if __name__ == "__main__":
+ logging.getLogger().setLevel(logging.INFO)
+ unittest.main()
diff --git a/sdks/python/apache_beam/io/parquetio.py
b/sdks/python/apache_beam/io/parquetio.py
index 48c51428c17..fa8b56f916d 100644
--- a/sdks/python/apache_beam/io/parquetio.py
+++ b/sdks/python/apache_beam/io/parquetio.py
@@ -48,6 +48,7 @@ from apache_beam.transforms import ParDo
from apache_beam.transforms import PTransform
from apache_beam.transforms import window
from apache_beam.typehints import schemas
+from apache_beam.utils.windowed_value import WindowedValue
try:
import pyarrow as pa
@@ -105,8 +106,10 @@ class _RowDictionariesToArrowTable(DoFn):
self._buffer_size = record_batch_size
self._record_batches = []
self._record_batches_byte_size = 0
+ self._window = None
- def process(self, row):
+ def process(self, row, w=DoFn.WindowParam, pane=DoFn.PaneInfoParam):
+ self._window = w
if len(self._buffer[0]) >= self._buffer_size:
self._flush_buffer()
@@ -123,7 +126,17 @@ class _RowDictionariesToArrowTable(DoFn):
self._flush_buffer()
if self._record_batches_byte_size > 0:
table = self._create_table()
- yield window.GlobalWindows.windowed_value_at_end_of_window(table)
+ if self._window is None or isinstance(self._window, window.GlobalWindow):
+ # bounded input
+ yield window.GlobalWindows.windowed_value_at_end_of_window(table)
+ else:
+ # unbounded input
+ yield WindowedValue(
+ table,
+ timestamp=self._window.
+ end, #or it could be max of timestamp of the rows processed
+ windows=[self._window] # TODO(pabloem) HOW DO WE GET THE PANE
+ )
def display_data(self):
res = super().display_data()
@@ -476,7 +489,9 @@ class WriteToParquet(PTransform):
file_name_suffix='',
num_shards=0,
shard_name_template=None,
- mime_type='application/x-parquet'):
+ mime_type='application/x-parquet',
+ triggering_frequency=None,
+ ):
"""Initialize a WriteToParquet transform.
Writes parquet files from a :class:`~apache_beam.pvalue.PCollection` of
@@ -540,14 +555,26 @@ class WriteToParquet(PTransform):
the performance of a pipeline. Setting this value is not recommended
unless you require a specific number of output files.
shard_name_template: A template string containing placeholders for
- the shard number and shard count. When constructing a filename for a
- particular shard number, the upper-case letters 'S' and 'N' are
- replaced with the 0-padded shard number and shard count respectively.
- This argument can be '' in which case it behaves as if num_shards was
- set to 1 and only one file will be generated. The default pattern used
- is '-SSSSS-of-NNNNN' if None is passed as the shard_name_template.
+ the shard number and shard count. Currently only ``''``,
+ ``'-SSSSS-of-NNNNN'``, ``'-W-SSSSS-of-NNNNN'`` and
+ ``'-V-SSSSS-of-NNNNN'`` are patterns accepted by the service.
+ When constructing a filename for a particular shard number, the
+ upper-case letters ``S`` and ``N`` are replaced with the ``0``-padded
+ shard number and shard count respectively. This argument can be ``''``
+ in which case it behaves as if num_shards was set to 1 and only one
file
+ will be generated. The default pattern used is ``'-SSSSS-of-NNNNN'``
for
+ bounded PCollections and for ``'-W-SSSSS-of-NNNNN'`` unbounded
+ PCollections.
+ W is used for windowed shard naming and is replaced with
+ ``[window.start, window.end)``
+ V is used for windowed shard naming and is replaced with
+ ``[window.start.to_utc_datetime().strftime("%Y-%m-%dT%H-%M-%S"),
+ window.end.to_utc_datetime().strftime("%Y-%m-%dT%H-%M-%S")``
mime_type: The MIME type to use for the produced files, if the filesystem
supports specifying MIME types.
+ triggering_frequency: (int) Every triggering_frequency duration, a
window
+ will be triggered and all bundles in the window will be written.
+ If set it overrides user windowing. Mandatory for GlobalWindow.
Returns:
A WriteToParquet transform usable for writing.
@@ -567,10 +594,20 @@ class WriteToParquet(PTransform):
file_name_suffix,
num_shards,
shard_name_template,
- mime_type
+ mime_type,
+ triggering_frequency
)
def expand(self, pcoll):
+ if (not pcoll.is_bounded and self._sink.shard_name_template
+ == filebasedsink.DEFAULT_SHARD_NAME_TEMPLATE):
+ self._sink.shard_name_template = (
+ filebasedsink.DEFAULT_WINDOW_SHARD_NAME_TEMPLATE)
+ self._sink.shard_name_format = self._sink._template_to_format(
+ self._sink.shard_name_template)
+ self._sink.shard_name_glob_format = self._sink._template_to_glob_format(
+ self._sink.shard_name_template)
+
if self._schema is None:
try:
beam_schema = schemas.schema_from_element_type(pcoll.element_type)
@@ -583,7 +620,11 @@ class WriteToParquet(PTransform):
else:
convert_fn = _RowDictionariesToArrowTable(
self._schema, self._row_group_buffer_size, self._record_batch_size)
- return pcoll | ParDo(convert_fn) | Write(self._sink)
+ if pcoll.is_bounded:
+ return pcoll | ParDo(convert_fn) | Write(self._sink)
+ else:
+ self._sink.convert_fn = convert_fn
+ return pcoll | Write(self._sink)
def display_data(self):
return {
@@ -610,7 +651,7 @@ class WriteToParquetBatched(PTransform):
num_shards=0,
shard_name_template=None,
mime_type='application/x-parquet',
- ):
+ triggering_frequency=None):
"""Initialize a WriteToParquetBatched transform.
Writes parquet files from a :class:`~apache_beam.pvalue.PCollection` of
@@ -668,11 +709,21 @@ class WriteToParquetBatched(PTransform):
the shard number and shard count. When constructing a filename for a
particular shard number, the upper-case letters 'S' and 'N' are
replaced with the 0-padded shard number and shard count respectively.
+ W is used for windowed shard naming and is replaced with
+ ``[window.start, window.end)``
+ V is used for windowed shard naming and is replaced with
+ ``[window.start.to_utc_datetime().isoformat(),
+ window.end.to_utc_datetime().isoformat()``
This argument can be '' in which case it behaves as if num_shards was
- set to 1 and only one file will be generated. The default pattern used
- is '-SSSSS-of-NNNNN' if None is passed as the shard_name_template.
+ set to 1 and only one file will be generated.
+ The default pattern used is '-SSSSS-of-NNNNN' if None is passed as the
+ shard_name_template and the PCollection is bounded.
+ The default pattern used is '-W-SSSSS-of-NNNNN' if None is passed as
the
+ shard_name_template and the PCollection is unbounded.
mime_type: The MIME type to use for the produced files, if the filesystem
supports specifying MIME types.
+ triggering_frequency: (int) Every triggering_frequency duration, a window
+ will be triggered and all bundles in the window will be written.
Returns:
A WriteToParquetBatched transform usable for writing.
@@ -688,10 +739,19 @@ class WriteToParquetBatched(PTransform):
file_name_suffix,
num_shards,
shard_name_template,
- mime_type
+ mime_type,
+ triggering_frequency
)
def expand(self, pcoll):
+ if (not pcoll.is_bounded and self._sink.shard_name_template
+ == filebasedsink.DEFAULT_SHARD_NAME_TEMPLATE):
+ self._sink.shard_name_template = (
+ filebasedsink.DEFAULT_WINDOW_SHARD_NAME_TEMPLATE)
+ self._sink.shard_name_format = self._sink._template_to_format(
+ self._sink.shard_name_template)
+ self._sink.shard_name_glob_format = self._sink._template_to_glob_format(
+ self._sink.shard_name_template)
return pcoll | Write(self._sink)
def display_data(self):
@@ -707,7 +767,8 @@ def _create_parquet_sink(
file_name_suffix,
num_shards,
shard_name_template,
- mime_type):
+ mime_type,
+ triggering_frequency=60):
return \
_ParquetSink(
file_path_prefix,
@@ -718,7 +779,8 @@ def _create_parquet_sink(
file_name_suffix,
num_shards,
shard_name_template,
- mime_type
+ mime_type,
+ triggering_frequency
)
@@ -734,7 +796,8 @@ class _ParquetSink(filebasedsink.FileBasedSink):
file_name_suffix,
num_shards,
shard_name_template,
- mime_type):
+ mime_type,
+ triggering_frequency):
super().__init__(
file_path_prefix,
file_name_suffix=file_name_suffix,
@@ -744,7 +807,8 @@ class _ParquetSink(filebasedsink.FileBasedSink):
mime_type=mime_type,
# Compression happens at the block level using the supplied codec, and
# not at the file level.
- compression_type=CompressionTypes.UNCOMPRESSED)
+ compression_type=CompressionTypes.UNCOMPRESSED,
+ triggering_frequency=triggering_frequency)
self._schema = schema
self._codec = codec
if ARROW_MAJOR_VERSION == 1 and self._codec.lower() == "lz4":
diff --git a/sdks/python/apache_beam/io/parquetio_it_test.py
b/sdks/python/apache_beam/io/parquetio_it_test.py
index 052b54f3ebf..b06e7268fec 100644
--- a/sdks/python/apache_beam/io/parquetio_it_test.py
+++ b/sdks/python/apache_beam/io/parquetio_it_test.py
@@ -19,10 +19,14 @@
import logging
import string
import unittest
+import uuid
from collections import Counter
+from datetime import datetime
import pytest
+import pytz
+import apache_beam as beam
from apache_beam import Create
from apache_beam import DoFn
from apache_beam import FlatMap
@@ -37,6 +41,7 @@ from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.util import BeamAssertException
from apache_beam.transforms import CombineGlobally
from apache_beam.transforms.combiners import Count
+from apache_beam.transforms.periodicsequence import PeriodicImpulse
try:
import pyarrow as pa
@@ -142,6 +147,42 @@ class ProducerFn(DoFn):
return i
[email protected](pa is None, "PyArrow is not installed.")
+class WriteStreamingIT(unittest.TestCase):
+ def setUp(self):
+ self.test_pipeline = TestPipeline(is_integration_test=True)
+ self.runner_name = type(self.test_pipeline.runner).__name__
+ super().setUp()
+
+ def test_write_streaming_2_shards_default_shard_name_template(
+ self, num_shards=2):
+
+ args = self.test_pipeline.get_full_options_as_args(streaming=True)
+
+ unique_id = str(uuid.uuid4())
+ output_file =
f'gs://apache-beam-testing-integration-testing/iobase/test-{unique_id}' #
pylint: disable=line-too-long
+ p = beam.Pipeline(argv=args)
+ pyschema = pa.schema([('age', pa.int64())])
+
+ _ = (
+ p
+ | "generate impulse" >> PeriodicImpulse(
+ start_timestamp=datetime(2021, 3, 1, 0, 0, 1, 0,
+ tzinfo=pytz.UTC).timestamp(),
+ stop_timestamp=datetime(2021, 3, 1, 0, 0, 20, 0,
+ tzinfo=pytz.UTC).timestamp(),
+ fire_interval=1)
+ | "generate data" >> beam.Map(lambda t: {'age': t * 10})
+ | 'WriteToParquet' >> beam.io.WriteToParquet(
+ file_path_prefix=output_file,
+ file_name_suffix=".parquet",
+ num_shards=num_shards,
+ triggering_frequency=60,
+ schema=pyschema))
+ result = p.run()
+ result.wait_until_finish(duration=600 * 1000)
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
diff --git a/sdks/python/apache_beam/io/parquetio_test.py
b/sdks/python/apache_beam/io/parquetio_test.py
index c602f4cc801..9371705a1fa 100644
--- a/sdks/python/apache_beam/io/parquetio_test.py
+++ b/sdks/python/apache_beam/io/parquetio_test.py
@@ -16,17 +16,21 @@
#
# pytype: skip-file
+import glob
import json
import logging
import os
+import re
import shutil
import tempfile
import unittest
+from datetime import datetime
from tempfile import TemporaryDirectory
import hamcrest as hc
import pandas
import pytest
+import pytz
from parameterized import param
from parameterized import parameterized
@@ -45,10 +49,12 @@ from apache_beam.io.parquetio import WriteToParquetBatched
from apache_beam.io.parquetio import _create_parquet_sink
from apache_beam.io.parquetio import _create_parquet_source
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
from apache_beam.transforms.display import DisplayData
from apache_beam.transforms.display_test import DisplayDataItemMatcher
+from apache_beam.transforms.util import LogElements
try:
import pyarrow as pa
@@ -655,6 +661,290 @@ class TestParquet(unittest.TestCase):
equal_to(result))
+class GenerateEvent(beam.PTransform):
+ @staticmethod
+ def sample_data():
+ return GenerateEvent()
+
+ def expand(self, input):
+ elemlist = [{'age': 10}, {'age': 20}, {'age': 30}]
+ elem = elemlist
+ return (
+ input
+ | TestStream().add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 1, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 2, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 3, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 4, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ advance_watermark_to(
+ datetime(2021, 3, 1, 0, 0, 5, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 5, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 6,
+ 0, tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 7, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 8, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 9, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ advance_watermark_to(
+ datetime(2021, 3, 1, 0, 0, 10, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 10, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 11, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 12, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 13, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 14, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ advance_watermark_to(
+ datetime(2021, 3, 1, 0, 0, 15, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 15, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 16, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 17, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 18, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 19, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ advance_watermark_to(
+ datetime(2021, 3, 1, 0, 0, 20, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 20, 0,
+
tzinfo=pytz.UTC).timestamp()).advance_watermark_to(
+ datetime(
+ 2021, 3, 1, 0, 0, 25, 0, tzinfo=pytz.UTC).
+ timestamp()).advance_watermark_to_infinity())
+
+
+class WriteStreamingTest(unittest.TestCase):
+ def setUp(self):
+ super().setUp()
+ self.tempdir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ if os.path.exists(self.tempdir):
+ shutil.rmtree(self.tempdir)
+
+ def test_write_streaming_2_shards_default_shard_name_template(
+ self, num_shards=2):
+ with TestPipeline() as p:
+ output = (p | GenerateEvent.sample_data())
+ #ParquetIO
+ pyschema = pa.schema([('age', pa.int64())])
+ output2 = output | 'WriteToParquet' >> beam.io.WriteToParquet(
+ file_path_prefix=self.tempdir + "/ouput_WriteToParquet",
+ file_name_suffix=".parquet",
+ num_shards=num_shards,
+ triggering_frequency=60,
+ schema=pyschema)
+ _ = output2 | 'LogElements after WriteToParquet' >> LogElements(
+ prefix='after WriteToParquet ', with_window=True, level=logging.INFO)
+
+ # Regex to match the expected windowed file pattern
+ # Example:
+ # ouput_WriteToParquet-[1614556800.0, 1614556805.0)-00000-of-00002.parquet
+ # It captures: window_interval, shard_num, total_shards
+ pattern_string = (
+ r'.*-\[(?P<window_start>[\d\.]+), '
+ r'(?P<window_end>[\d\.]+|Infinity)\)-'
+ r'(?P<shard_num>\d{5})-of-(?P<total_shards>\d{5})\.parquet$')
+ pattern = re.compile(pattern_string)
+ file_names = []
+ for file_name in glob.glob(self.tempdir + '/ouput_WriteToParquet*'):
+ match = pattern.match(file_name)
+ self.assertIsNotNone(
+ match, f"File name {file_name} did not match expected pattern.")
+ if match:
+ file_names.append(file_name)
+ print("Found files matching expected pattern:", file_names)
+ self.assertEqual(
+ len(file_names),
+ num_shards,
+ "expected %d files, but got: %d" % (num_shards, len(file_names)))
+
+ def test_write_streaming_2_shards_custom_shard_name_template(
+ self, num_shards=2, shard_name_template='-V-SSSSS-of-NNNNN'):
+ with TestPipeline() as p:
+ output = (p | GenerateEvent.sample_data())
+ #ParquetIO
+ pyschema = pa.schema([('age', pa.int64())])
+ output2 = output | 'WriteToParquet' >> beam.io.WriteToParquet(
+ file_path_prefix=self.tempdir + "/ouput_WriteToParquet",
+ file_name_suffix=".parquet",
+ shard_name_template=shard_name_template,
+ num_shards=num_shards,
+ triggering_frequency=60,
+ schema=pyschema)
+ _ = output2 | 'LogElements after WriteToParquet' >> LogElements(
+ prefix='after WriteToParquet ', with_window=True, level=logging.INFO)
+
+ # Regex to match the expected windowed file pattern
+ # Example:
+ # ouput_WriteToParquet-[2021-03-01T00-00-00, 2021-03-01T00-01-00)-
+ # 00000-of-00002.parquet
+ # It captures: window_interval, shard_num, total_shards
+ pattern_string = (
+ r'.*-\[(?P<window_start>\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}), '
+ r'(?P<window_end>\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}|Infinity)\)-'
+ r'(?P<shard_num>\d{5})-of-(?P<total_shards>\d{5})\.parquet$')
+ pattern = re.compile(pattern_string)
+ file_names = []
+ for file_name in glob.glob(self.tempdir + '/ouput_WriteToParquet*'):
+ match = pattern.match(file_name)
+ self.assertIsNotNone(
+ match, f"File name {file_name} did not match expected pattern.")
+ if match:
+ file_names.append(file_name)
+ print("Found files matching expected pattern:", file_names)
+ self.assertEqual(
+ len(file_names),
+ num_shards,
+ "expected %d files, but got: %d" % (num_shards, len(file_names)))
+
+ def test_write_streaming_2_shards_custom_shard_name_template_5s_window(
+ self,
+ num_shards=2,
+ shard_name_template='-V-SSSSS-of-NNNNN',
+ triggering_frequency=5):
+ with TestPipeline() as p:
+ output = (p | GenerateEvent.sample_data())
+ #ParquetIO
+ pyschema = pa.schema([('age', pa.int64())])
+ output2 = output | 'WriteToParquet' >> beam.io.WriteToParquet(
+ file_path_prefix=self.tempdir + "/ouput_WriteToParquet",
+ file_name_suffix=".parquet",
+ shard_name_template=shard_name_template,
+ num_shards=num_shards,
+ triggering_frequency=triggering_frequency,
+ schema=pyschema)
+ _ = output2 | 'LogElements after WriteToParquet' >> LogElements(
+ prefix='after WriteToParquet ', with_window=True, level=logging.INFO)
+
+ # Regex to match the expected windowed file pattern
+ # Example:
+ # ouput_WriteToParquet-[2021-03-01T00-00-00, 2021-03-01T00-01-00)-
+ # 00000-of-00002.parquet
+ # It captures: window_interval, shard_num, total_shards
+ pattern_string = (
+ r'.*-\[(?P<window_start>\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}), '
+ r'(?P<window_end>\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}|Infinity)\)-'
+ r'(?P<shard_num>\d{5})-of-(?P<total_shards>\d{5})\.parquet$')
+ pattern = re.compile(pattern_string)
+ file_names = []
+ for file_name in glob.glob(self.tempdir + '/ouput_WriteToParquet*'):
+ match = pattern.match(file_name)
+ self.assertIsNotNone(
+ match, f"File name {file_name} did not match expected pattern.")
+ if match:
+ file_names.append(file_name)
+ print("Found files matching expected pattern:", file_names)
+ # for 5s window size, the input should be processed by 5 windows with
+ # 2 shards per window
+ self.assertEqual(
+ len(file_names),
+ 10,
+ "expected %d files, but got: %d" % (num_shards, len(file_names)))
+
+ def
test_write_streaming_undef_shards_default_shard_name_template_windowed_pcoll(
# pylint: disable=line-too-long
+ self):
+ with TestPipeline() as p:
+ output = (
+ p | GenerateEvent.sample_data()
+ | 'User windowing' >> beam.transforms.core.WindowInto(
+ beam.transforms.window.FixedWindows(10),
+ trigger=beam.transforms.trigger.AfterWatermark(),
+ accumulation_mode=beam.transforms.trigger.AccumulationMode.
+ DISCARDING,
+ allowed_lateness=beam.utils.timestamp.Duration(seconds=0)))
+ #ParquetIO
+ pyschema = pa.schema([('age', pa.int64())])
+ output2 = output | 'WriteToParquet' >> beam.io.WriteToParquet(
+ file_path_prefix=self.tempdir + "/ouput_WriteToParquet",
+ file_name_suffix=".parquet",
+ num_shards=0,
+ schema=pyschema)
+ _ = output2 | 'LogElements after WriteToParquet' >> LogElements(
+ prefix='after WriteToParquet ', with_window=True, level=logging.INFO)
+
+ # Regex to match the expected windowed file pattern
+ # Example:
+ # ouput_WriteToParquet-[1614556800.0, 1614556805.0)-00000-of-00002.parquet
+ # It captures: window_interval, shard_num, total_shards
+ pattern_string = (
+ r'.*-\[(?P<window_start>[\d\.]+), '
+ r'(?P<window_end>[\d\.]+|Infinity)\)-'
+ r'(?P<shard_num>\d{5})-of-(?P<total_shards>\d{5})\.parquet$')
+ pattern = re.compile(pattern_string)
+ file_names = []
+ for file_name in glob.glob(self.tempdir + '/ouput_WriteToParquet*'):
+ match = pattern.match(file_name)
+ self.assertIsNotNone(
+ match, f"File name {file_name} did not match expected pattern.")
+ if match:
+ file_names.append(file_name)
+ print("Found files matching expected pattern:", file_names)
+ self.assertGreaterEqual(
+ len(file_names),
+ 1 * 3, #25s of data covered by 3 10s windows
+ "expected %d files, but got: %d" % (1 * 3, len(file_names)))
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()
diff --git a/sdks/python/apache_beam/io/tfrecordio.py
b/sdks/python/apache_beam/io/tfrecordio.py
index b911c64a134..e27ea5070b0 100644
--- a/sdks/python/apache_beam/io/tfrecordio.py
+++ b/sdks/python/apache_beam/io/tfrecordio.py
@@ -290,7 +290,8 @@ class _TFRecordSink(filebasedsink.FileBasedSink):
file_name_suffix,
num_shards,
shard_name_template,
- compression_type):
+ compression_type,
+ triggering_frequency=60):
"""Initialize a TFRecordSink. See WriteToTFRecord for details."""
super().__init__(
@@ -300,7 +301,8 @@ class _TFRecordSink(filebasedsink.FileBasedSink):
num_shards=num_shards,
shard_name_template=shard_name_template,
mime_type='application/octet-stream',
- compression_type=compression_type)
+ compression_type=compression_type,
+ triggering_frequency=triggering_frequency)
def write_encoded_record(self, file_handle, value):
_TFRecordUtil.write_record(file_handle, value)
@@ -315,7 +317,8 @@ class WriteToTFRecord(PTransform):
file_name_suffix='',
num_shards=0,
shard_name_template=None,
- compression_type=CompressionTypes.AUTO):
+ compression_type=CompressionTypes.AUTO,
+ triggering_frequency=None):
"""Initialize WriteToTFRecord transform.
Args:
@@ -326,16 +329,29 @@ class WriteToTFRecord(PTransform):
file_name_suffix: Suffix for the files written.
num_shards: The number of files (shards) used for output. If not set, the
default value will be used.
+ In streaming if not set, the service will write a file per bundle.
shard_name_template: A template string containing placeholders for
- the shard number and shard count. When constructing a filename for a
- particular shard number, the upper-case letters 'S' and 'N' are
- replaced with the 0-padded shard number and shard count respectively.
- This argument can be '' in which case it behaves as if num_shards was
- set to 1 and only one file will be generated. The default pattern used
- is '-SSSSS-of-NNNNN' if None is passed as the shard_name_template.
+ the shard number and shard count. Currently only ``''``,
+ ``'-SSSSS-of-NNNNN'``, ``'-W-SSSSS-of-NNNNN'`` and
+ ``'-V-SSSSS-of-NNNNN'`` are patterns accepted by the service.
+ When constructing a filename for a particular shard number, the
+ upper-case letters ``S`` and ``N`` are replaced with the ``0``-padded
+ shard number and shard count respectively. This argument can be ``''``
+ in which case it behaves as if num_shards was set to 1 and only one
file
+ will be generated. The default pattern used is ``'-SSSSS-of-NNNNN'``
for
+ bounded PCollections and for ``'-W-SSSSS-of-NNNNN'`` unbounded
+ PCollections.
+ W is used for windowed shard naming and is replaced with
+ ``[window.start, window.end)``
+ V is used for windowed shard naming and is replaced with
+ ``[window.start.to_utc_datetime().strftime("%Y-%m-%dT%H-%M-%S"),
+ window.end.to_utc_datetime().strftime("%Y-%m-%dT%H-%M-%S")``
compression_type: Used to handle compressed output files. Typical value
is CompressionTypes.AUTO, in which case the file_path's extension
will
be used to detect the compression.
+ triggering_frequency: (int) Every triggering_frequency duration, a
window
+ will be triggered and all bundles in the window will be written.
+ If set it overrides user windowing. Mandatory for GlobalWindow.
Returns:
A WriteToTFRecord transform object.
@@ -347,7 +363,17 @@ class WriteToTFRecord(PTransform):
file_name_suffix,
num_shards,
shard_name_template,
- compression_type)
+ compression_type,
+ triggering_frequency)
def expand(self, pcoll):
+ if (not pcoll.is_bounded and self._sink.shard_name_template
+ == filebasedsink.DEFAULT_SHARD_NAME_TEMPLATE):
+ self._sink.shard_name_template = (
+ filebasedsink.DEFAULT_WINDOW_SHARD_NAME_TEMPLATE)
+ self._sink.shard_name_format = self._sink._template_to_format(
+ self._sink.shard_name_template)
+ self._sink.shard_name_glob_format = self._sink._template_to_glob_format(
+ self._sink.shard_name_template)
+
return pcoll | Write(self._sink)
diff --git a/sdks/python/apache_beam/io/tfrecordio_test.py
b/sdks/python/apache_beam/io/tfrecordio_test.py
index a867c0212ad..6522ade36d8 100644
--- a/sdks/python/apache_beam/io/tfrecordio_test.py
+++ b/sdks/python/apache_beam/io/tfrecordio_test.py
@@ -21,15 +21,20 @@ import binascii
import glob
import gzip
import io
+import json
import logging
import os
import pickle
import random
import re
+import shutil
+import tempfile
import unittest
import zlib
+from datetime import datetime
import crcmod
+import pytz
import apache_beam as beam
from apache_beam import Create
@@ -41,9 +46,11 @@ from apache_beam.io.tfrecordio import WriteToTFRecord
from apache_beam.io.tfrecordio import _TFRecordSink
from apache_beam.io.tfrecordio import _TFRecordUtil
from apache_beam.testing.test_pipeline import TestPipeline
+from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.test_utils import TempDir
from apache_beam.testing.util import assert_that
from apache_beam.testing.util import equal_to
+from apache_beam.transforms.util import LogElements
try:
import tensorflow.compat.v1 as tf # pylint: disable=import-error
@@ -558,6 +565,258 @@ class TestEnd2EndWriteAndRead(unittest.TestCase):
assert_that(actual_data, equal_to(expected_data))
+class GenerateEvent(beam.PTransform):
+ @staticmethod
+ def sample_data():
+ return GenerateEvent()
+
+ def expand(self, input):
+ elemlist = [{'age': 10}, {'age': 20}, {'age': 30}]
+ elem = elemlist
+ return (
+ input
+ | TestStream().add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 1, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 2, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 3, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 4, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ advance_watermark_to(
+ datetime(2021, 3, 1, 0, 0, 5, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 5, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 6,
+ 0, tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 7, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 8, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 9, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ advance_watermark_to(
+ datetime(2021, 3, 1, 0, 0, 10, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 10, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 11, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 12, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 13, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 14, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ advance_watermark_to(
+ datetime(2021, 3, 1, 0, 0, 15, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 15, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 16, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 17, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 18, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 19, 0,
+ tzinfo=pytz.UTC).timestamp()).
+ advance_watermark_to(
+ datetime(2021, 3, 1, 0, 0, 20, 0,
+ tzinfo=pytz.UTC).timestamp()).add_elements(
+ elements=elem,
+ event_timestamp=datetime(
+ 2021, 3, 1, 0, 0, 20, 0,
+
tzinfo=pytz.UTC).timestamp()).advance_watermark_to(
+ datetime(
+ 2021, 3, 1, 0, 0, 25, 0, tzinfo=pytz.UTC).
+ timestamp()).advance_watermark_to_infinity())
+
+
+class WriteStreamingTest(unittest.TestCase):
+ def setUp(self):
+ super().setUp()
+ self.tempdir = tempfile.mkdtemp()
+
+ def tearDown(self):
+ if os.path.exists(self.tempdir):
+ shutil.rmtree(self.tempdir)
+
+ def test_write_streaming_2_shards_default_shard_name_template(
+ self, num_shards=2):
+ with TestPipeline() as p:
+ output = (
+ p
+ | GenerateEvent.sample_data()
+ | 'User windowing' >> beam.transforms.core.WindowInto(
+ beam.transforms.window.FixedWindows(60),
+ trigger=beam.transforms.trigger.AfterWatermark(),
+ accumulation_mode=beam.transforms.trigger.AccumulationMode.
+ DISCARDING,
+ allowed_lateness=beam.utils.timestamp.Duration(seconds=0))
+ | "encode" >> beam.Map(lambda s: json.dumps(s).encode('utf-8')))
+ #TFrecordIO
+ output2 = output | 'WriteToTFRecord' >> beam.io.WriteToTFRecord(
+ file_path_prefix=self.tempdir + "/ouput_WriteToTFRecord",
+ file_name_suffix=".tfrecord",
+ num_shards=num_shards,
+ )
+ _ = output2 | 'LogElements after WriteToTFRecord' >> LogElements(
+ prefix='after WriteToTFRecord ', with_window=True,
level=logging.INFO)
+
+ # Regex to match the expected windowed file pattern
+ # Example:
+ # ouput_WriteToTFRecord-[1614556800.0,
1614556805.0)-00000-of-00002.tfrecord
+ # It captures: window_interval, shard_num, total_shards
+ pattern_string = (
+ r'.*-\[(?P<window_start>[\d\.]+), '
+ r'(?P<window_end>[\d\.]+|Infinity)\)-'
+ r'(?P<shard_num>\d{5})-of-(?P<total_shards>\d{5})\.tfrecord$')
+ pattern = re.compile(pattern_string)
+ file_names = []
+ for file_name in glob.glob(self.tempdir + '/ouput_WriteToTFRecord*'):
+ match = pattern.match(file_name)
+ self.assertIsNotNone(
+ match, f"File name {file_name} did not match expected pattern.")
+ if match:
+ file_names.append(file_name)
+ print("Found files matching expected pattern:", file_names)
+ self.assertEqual(
+ len(file_names),
+ num_shards,
+ "expected %d files, but got: %d" % (num_shards, len(file_names)))
+
+ def test_write_streaming_2_shards_custom_shard_name_template(
+ self, num_shards=2, shard_name_template='-V-SSSSS-of-NNNNN'):
+ with TestPipeline() as p:
+ output = (
+ p
+ | GenerateEvent.sample_data()
+ | "encode" >> beam.Map(lambda s: json.dumps(s).encode('utf-8')))
+ #TFrecordIO
+ output2 = output | 'WriteToTFRecord' >> beam.io.WriteToTFRecord(
+ file_path_prefix=self.tempdir + "/ouput_WriteToTFRecord",
+ file_name_suffix=".tfrecord",
+ shard_name_template=shard_name_template,
+ num_shards=num_shards,
+ triggering_frequency=60,
+ )
+ _ = output2 | 'LogElements after WriteToTFRecord' >> LogElements(
+ prefix='after WriteToTFRecord ', with_window=True,
level=logging.INFO)
+
+ # Regex to match the expected windowed file pattern
+ # Example:
+ # ouput_WriteToTFRecord-[2021-03-01T00-00-00, 2021-03-01T00-01-00)-
+ # 00000-of-00002.tfrecord
+ # It captures: window_interval, shard_num, total_shards
+ pattern_string = (
+ r'.*-\[(?P<window_start>\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}), '
+ r'(?P<window_end>\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}|Infinity)\)-'
+ r'(?P<shard_num>\d{5})-of-(?P<total_shards>\d{5})\.tfrecord$')
+ pattern = re.compile(pattern_string)
+ file_names = []
+ for file_name in glob.glob(self.tempdir + '/ouput_WriteToTFRecord*'):
+ match = pattern.match(file_name)
+ self.assertIsNotNone(
+ match, f"File name {file_name} did not match expected pattern.")
+ if match:
+ file_names.append(file_name)
+ print("Found files matching expected pattern:", file_names)
+ self.assertEqual(
+ len(file_names),
+ num_shards,
+ "expected %d files, but got: %d" % (num_shards, len(file_names)))
+
+ def test_write_streaming_2_shards_custom_shard_name_template_5s_window(
+ self,
+ num_shards=2,
+ shard_name_template='-V-SSSSS-of-NNNNN',
+ triggering_frequency=5):
+ with TestPipeline() as p:
+ output = (
+ p
+ | GenerateEvent.sample_data()
+ | "encode" >> beam.Map(lambda s: json.dumps(s).encode('utf-8')))
+ #TFrecordIO
+ output2 = output | 'WriteToTFRecord' >> beam.io.WriteToTFRecord(
+ file_path_prefix=self.tempdir + "/ouput_WriteToTFRecord",
+ file_name_suffix=".tfrecord",
+ shard_name_template=shard_name_template,
+ num_shards=num_shards,
+ triggering_frequency=triggering_frequency,
+ )
+ _ = output2 | 'LogElements after WriteToTFRecord' >> LogElements(
+ prefix='after WriteToTFRecord ', with_window=True,
level=logging.INFO)
+
+ # Regex to match the expected windowed file pattern
+ # Example:
+ # ouput_WriteToTFRecord-[2021-03-01T00-00-00, 2021-03-01T00-01-00)-
+ # 00000-of-00002.tfrecord
+ # It captures: window_interval, shard_num, total_shards
+ pattern_string = (
+ r'.*-\[(?P<window_start>\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}), '
+ r'(?P<window_end>\d{4}-\d{2}-\d{2}T\d{2}-\d{2}-\d{2}|Infinity)\)-'
+ r'(?P<shard_num>\d{5})-of-(?P<total_shards>\d{5})\.tfrecord$')
+ pattern = re.compile(pattern_string)
+ file_names = []
+ for file_name in glob.glob(self.tempdir + '/ouput_WriteToTFRecord*'):
+ match = pattern.match(file_name)
+ self.assertIsNotNone(
+ match, f"File name {file_name} did not match expected pattern.")
+ if match:
+ file_names.append(file_name)
+ print("Found files matching expected pattern:", file_names)
+ # for 5s window size, the input should be processed by 5 windows with
+ # 2 shards per window
+ self.assertEqual(
+ len(file_names),
+ 10,
+ "expected %d files, but got: %d" % (num_shards, len(file_names)))
+
+
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
unittest.main()