davidcavazos commented on a change in pull request #14962:
URL: https://github.com/apache/beam/pull/14962#discussion_r660824581
##########
File path: examples/notebooks/tour-of-beam/windowing.ipynb
##########
@@ -0,0 +1,703 @@
+{
+ "nbformat": 4,
+ "nbformat_minor": 0,
+ "metadata": {
+ "colab": {
+ "name": "Windowing -- Tour of Beam",
+ "provenance": [],
+ "collapsed_sections": [],
+ "toc_visible": true
+ },
+ "kernelspec": {
+ "name": "python3",
+ "display_name": "Python 3"
+ }
+ },
+ "cells": [
+ {
+ "cell_type": "code",
+ "metadata": {
+ "cellView": "form",
+ "id": "upmJn_DjcThx"
+ },
+ "source": [
+ "#@title ###### Licensed to the Apache Software Foundation (ASF),
Version 2.0 (the \"License\")\n",
+ "\n",
+ "# Licensed to the Apache Software Foundation (ASF) under one\n",
+ "# or more contributor license agreements. See the NOTICE file\n",
+ "# distributed with this work for additional information\n",
+ "# regarding copyright ownership. The ASF licenses this file\n",
+ "# to you under the Apache License, Version 2.0 (the\n",
+ "# \"License\"); you may not use this file except in compliance\n",
+ "# with the License. You may obtain a copy of the License at\n",
+ "#\n",
+ "# http://www.apache.org/licenses/LICENSE-2.0\n",
+ "#\n",
+ "# Unless required by applicable law or agreed to in writing,\n",
+ "# software distributed under the License is distributed on an\n",
+ "# \"AS IS\" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY\n",
+ "# KIND, either express or implied. See the License for the\n",
+ "# specific language governing permissions and limitations\n",
+ "# under the License."
+ ],
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "5UC_aGanx6oE"
+ },
+ "source": [
+ "# Windowing -- _Tour of Beam_\n",
+ "\n",
+ "Sometimes, we want to
[aggregate](https://beam.apache.org/documentation/transforms/python/overview/#aggregation)
data, like `GroupByKey` or `Combine`, only at certain intervals, like hourly
or daily, instead of processing the entire `PCollection` of data only once.\n",
+ "\n",
+ "We might want to emit a [moving
average](https://en.wikipedia.org/wiki/Moving_average) as we're processing
data.\n",
+ "\n",
+ "Maybe we want to analyze the user experience for a certain task in a
web app, it would be nice to get the app events by sessions of activity.\n",
+ "\n",
+ "Or we could be running a streaming pipeline, and there is no end to
the data, so how can we aggregate data?\n",
+ "\n",
+ "_Windows_ in Beam allow us to process only certain data intervals at
a time.\n",
+ "In this notebook, we go through different ways of windowing our
pipeline.\n",
+ "\n",
+ "Lets begin by installing `apache-beam`."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "metadata": {
+ "id": "R_Yhoc6N_Flg"
+ },
+ "source": [
+ "# Install apache-beam with pip.\n",
+ "!pip install --quiet apache-beam"
+ ],
+ "execution_count": null,
+ "outputs": []
+ },
+ {
+ "cell_type": "markdown",
+ "metadata": {
+ "id": "_OkWHiAvpWDZ"
+ },
+ "source": [
+ "First, lets define some helper functions to simplify the rest of the
examples.\n",
+ "\n",
+ "We have a transform to help us analyze an element alongside its
window information, and we have another transform to help us analyze how many
elements landed into each window.\n",
+ "We use a custom
[`DoFn`](https://beam.apache.org/documentation/transforms/python/elementwise/pardo)\n",
+ "to access that information.\n",
+ "\n",
+ "You don't need to understand these, you just need to know they exist
🙂."
+ ]
+ },
+ {
+ "cell_type": "code",
+ "metadata": {
+ "id": "C9yAN1Hgk3dF"
+ },
+ "source": [
+ "import apache_beam as beam\n",
+ "\n",
+ "def human_readable_window(window) -> str:\n",
+ " \"\"\"Formats a window object into a human readable
string.\"\"\"\n",
+ " if isinstance(window, beam.window.GlobalWindow):\n",
+ " return str(window)\n",
+ " return f'{window.start.to_utc_datetime()} -
{window.end.to_utc_datetime()}'\n",
+ "\n",
+ "class PrintElementInfo(beam.DoFn):\n",
+ " \"\"\"Prints an element with its Window information.\"\"\"\n",
+ " def process(self, element, timestamp=beam.DoFn.TimestampParam,
window=beam.DoFn.WindowParam):\n",
+ " print(f'[{human_readable_window(window)}]
{timestamp.to_utc_datetime()} -- {element}')\n",
+ " yield element\n",
+ "\n",
+ "@beam.ptransform_fn\n",
Review comment:
Thanks, I removed PTransforms and only use DoFn since we need to access
the windowing information.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]