This is an automated email from the ASF dual-hosted git repository. echauchot pushed a commit to branch asf-site in repository https://gitbox.apache.org/repos/asf/flink-web.git
The following commit(s) were added to refs/heads/asf-site by this push: new ad43bac16 Rebuild website ad43bac16 is described below commit ad43bac16288cec9a8b0f0936c6532cb6ec70184 Author: Etienne Chauchot <echauc...@apache.org> AuthorDate: Tue May 9 10:28:18 2023 +0200 Rebuild website --- .../index.html | 1184 ++++++++++++++++++++ 1 file changed, 1184 insertions(+) diff --git a/content/2023/05/09/howto-migrate-a-real-life-batch-pipeline-from-the-dataset-api-to-the-datastream-api/index.html b/content/2023/05/09/howto-migrate-a-real-life-batch-pipeline-from-the-dataset-api-to-the-datastream-api/index.html new file mode 100644 index 000000000..b33895e44 --- /dev/null +++ b/content/2023/05/09/howto-migrate-a-real-life-batch-pipeline-from-the-dataset-api-to-the-datastream-api/index.html @@ -0,0 +1,1184 @@ + +<!DOCTYPE html> +<html lang="en" dir=> + +<head> + <meta name="generator" content="Hugo 0.111.3"> + <meta charset="UTF-8"> +<meta name="viewport" content="width=device-width, initial-scale=1.0"> +<meta name="description" content="Introduction # The Flink community has been deprecating the DataSet API since version 1.12 as part of the work on FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) . This blog article illustrates the migration of a real-life batch DataSet pipeline to a batch DataStream pipeline. All the code presented in this article is available in the tpcds-benchmark-flink repo. The use case shown here is extracted from a broader [...] +<meta name="theme-color" content="#FFFFFF"><meta property="og:title" content="Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API" /> +<meta property="og:description" content="Introduction # The Flink community has been deprecating the DataSet API since version 1.12 as part of the work on FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API) . This blog article illustrates the migration of a real-life batch DataSet pipeline to a batch DataStream pipeline. All the code presented in this article is available in the tpcds-benchmark-flink repo. The use case shown here is extracted from a b [...] +<meta property="og:type" content="article" /> +<meta property="og:url" content="https://flink.apache.org/2023/05/09/howto-migrate-a-real-life-batch-pipeline-from-the-dataset-api-to-the-datastream-api/" /><meta property="article:section" content="posts" /> +<meta property="article:published_time" content="2023-05-09T08:00:00+00:00" /> +<meta property="article:modified_time" content="2023-05-09T08:00:00+00:00" /> +<title>Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API | Apache Flink</title> +<link rel="manifest" href="/manifest.json"> +<link rel="icon" href="/favicon.png" type="image/x-icon"> +<link rel="stylesheet" href="/book.min.e3b33391dbc1f4b2cc47778e2f4b86c744ded3ccc82fdfb6f08caf91d8607f9a.css" integrity="sha256-47MzkdvB9LLMR3eOL0uGx0Te08zIL9+28Iyvkdhgf5o="> +<script defer src="/en.search.min.8592fd2e43835d2ef6fab8eb9b8969ee6ad1bdb888a636e37e28032f8bd9887d.js" integrity="sha256-hZL9LkODXS72+rjrm4lp7mrRvbiIpjbjfigDL4vZiH0="></script> +<!-- +Made with Book Theme +https://github.com/alex-shpak/hugo-book +--> + + + +<link rel="stylesheet" type="text/css" href="/font-awesome/css/font-awesome.min.css"> +<script src="/js/anchor.min.js"></script> +<script src="/js/flink.js"></script> +<link rel="canonical" href="https://flink.apache.org/2023/05/09/howto-migrate-a-real-life-batch-pipeline-from-the-dataset-api-to-the-datastream-api/"> + + + <script> + var _paq = window._paq = window._paq || []; + + + _paq.push(['disableCookies']); + + _paq.push(["setDomains", ["*.flink.apache.org","*.nightlies.apache.org/flink"]]); + _paq.push(['trackPageView']); + _paq.push(['enableLinkTracking']); + (function() { + var u="//analytics.apache.org/"; + _paq.push(['setTrackerUrl', u+'matomo.php']); + _paq.push(['setSiteId', '1']); + var d=document, g=d.createElement('script'), s=d.getElementsByTagName('script')[0]; + g.async=true; g.src=u+'matomo.js'; s.parentNode.insertBefore(g,s); + })(); + </script> + +</head> + +<body dir=> + <input type="checkbox" class="hidden toggle" id="menu-control" /> + <input type="checkbox" class="hidden toggle" id="toc-control" /> + <main class="container flex"> + <aside class="book-menu"> + + + +<nav> + + +<a id="logo" href="/"> + <img width="70%" src="/flink-header-logo.svg"> +</a> + +<div class="book-search"> + <input type="text" id="book-search-input" placeholder="Search" aria-label="Search" maxlength="64" data-hotkeys="s/" /> + <div class="book-search-spinner hidden"></div> + <ul id="book-search-results"></ul> +</div> + + + + + + + + + + + + + + + + + + + + + + + + <input type="checkbox" id="section-4117fb24454a2c30ee86e524839e77ec" class="toggle" /> + <label for="section-4117fb24454a2c30ee86e524839e77ec" class="flex justify-between flink-menu-item">What is Apache Flink?<span>▾</span> + </label> + + <ul> + + <li> + + + + + + <label for="section-ffd5922da551e96e0481423fab94c463" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="/what-is-flink/flink-architecture/" class="">Architecture</a> + </label> + + + </li> + + <li> + + + + + + <label for="section-fc28f08b67476edb77e00e03b6c7c2e0" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="/what-is-flink/flink-applications/" class="">Applications</a> + </label> + + + </li> + + <li> + + + + + + <label for="section-612df33a02d5d4ee78d718abaab5b5b4" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="/what-is-flink/flink-operations/" class="">Operations</a> + </label> + + + </li> + + </ul> + + + + + + + + + + + + + +<label for="section-f1ecec07350bd6810050d40158878749" class="flex justify-between flink-menu-item"> + <a href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/" style="color:black" class="">What is Stateful Functions? <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + + + + + + + + + + + + + +<label for="section-4113a4c3072cb35f6fd7a0d4e098ee70" class="flex justify-between flink-menu-item"> + <a href="https://nightlies.apache.org/flink/flink-ml-docs-stable/" style="color:black" class="">What is Flink ML? <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + + + + + + + + + + + + + +<label for="section-b39c70259d0abbe2bf1d8d645425f84d" class="flex justify-between flink-menu-item"> + <a href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/" style="color:black" class="">What is the Flink Kubernetes Operator? <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + + + + + + + + + + + + + +<label for="section-53e0b1afcb9ccaf779dc285aa272a014" class="flex justify-between flink-menu-item"> + <a href="https://nightlies.apache.org/flink/flink-table-store-docs-stable/" style="color:black" class="">What is Flink Table Store? <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + + + + + + + + + + + <label for="section-f4973f06a66f063045b4ebdacaf3127d" class="flex justify-between flink-menu-item"> + <a href="/use-cases/" class="">Use Cases</a> + </label> + + + + + + + + + + + + + <label for="section-0f1863835376e859ac438ae9529daff2" class="flex justify-between flink-menu-item"> + <a href="/powered-by/" class="">Powered By</a> + </label> + + + + + + <br/> + + + + + + + + + + + <label for="section-f383f23a96a43d8d0cc66aeb0237e26a" class="flex justify-between flink-menu-item"> + <a href="/downloads/" class="">Downloads</a> + </label> + + + + + + + + + + + + <input type="checkbox" id="section-c727fab97b4d77e5b28ce8c448fb9000" class="toggle" /> + <label for="section-c727fab97b4d77e5b28ce8c448fb9000" class="flex justify-between flink-menu-item">Getting Started<span>▾</span> + </label> + + <ul> + + <li> + + + + + + + + +<label for="section-f45abaa99ab076108b9a5b94edbc6647" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/try-flink/local_installation/" style="color:black" class="">With Flink <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + </li> + + <li> + + + + + + + + +<label for="section-efe2166e9dce6f72e126dcc2396b4402" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/getting-started/project-setup.html" style="color:black" class="">With Flink Stateful Functions <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + </li> + + <li> + + + + + + + + +<label for="section-7e268d0a469b1093bb33d71d093eb7b9" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="https://nightlies.apache.org/flink/flink-ml-docs-stable/docs/try-flink-ml/quick-start/" style="color:black" class="">With Flink ML <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + </li> + + <li> + + + + + + + + +<label for="section-cc7147cd0441503127bfaf6f219d4fbb" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/docs/try-flink-kubernetes-operator/quick-start/" style="color:black" class="">With Flink Kubernetes Operator <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + </li> + + <li> + + + + + + + + +<label for="section-660ca694e416d8ca9176dda52a60d637" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="https://nightlies.apache.org/flink/flink-table-store-docs-stable/docs/try-table-store/quick-start/" style="color:black" class="">With Flink Table Store <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + </li> + + <li> + + + + + + + + +<label for="section-75db0b47bf4ae9c247aadbba5fbd720d" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="https://nightlies.apache.org/flink/flink-docs-stable/docs/learn-flink/overview/" style="color:black" class="">Training Course <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + </li> + + </ul> + + + + + + + + + + <input type="checkbox" id="section-6318075fef29529089951a49d413d083" class="toggle" /> + <label for="section-6318075fef29529089951a49d413d083" class="flex justify-between flink-menu-item">Documentation<span>▾</span> + </label> + + <ul> + + <li> + + + + + + + + +<label for="section-9a8122d8912450484d1c25394ad40229" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="https://nightlies.apache.org/flink/flink-docs-stable/" style="color:black" class="">Flink 1.17 (stable) <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + </li> + + <li> + + + + + + + + +<label for="section-8b2fd3efb702be3783ba98d650707e3c" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="https://nightlies.apache.org/flink/flink-docs-master/" style="color:black" class="">Flink Master (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + </li> + + <li> + + + + + + + + +<label for="section-5317a079cddb964c59763c27607f43d9" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="https://nightlies.apache.org/flink/flink-statefun-docs-stable/" style="color:black" class="">Stateful Functions 3.2 (stable) <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + </li> + + <li> + + + + + + + + +<label for="section-25b72f108b7156e94d91b04853d8813a" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="https://nightlies.apache.org/flink/flink-statefun-docs-master" style="color:black" class="">Stateful Functions Master (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + </li> + + <li> + + + + + + + + +<label for="section-13a02f969904a2455a39ed90e287593f" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="https://nightlies.apache.org/flink/flink-ml-docs-stable/" style="color:black" class="">ML 2.2 (stable) <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + </li> + + <li> + + + + + + + + +<label for="section-6d895ec5ad127a29a6a9ce101328ccdf" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="https://nightlies.apache.org/flink/flink-ml-docs-master" style="color:black" class="">ML Master (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + </li> + + <li> + + + + + + + + +<label for="section-c83ad0caf34e364bf3729badd233a350" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-stable/" style="color:black" class="">Kubernetes Operator 1.4 (latest) <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + </li> + + <li> + + + + + + + + +<label for="section-a2c75d90005425982ba8f26ae0e160a3" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="https://nightlies.apache.org/flink/flink-kubernetes-operator-docs-main" style="color:black" class="">Kubernetes Operator Main (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + </li> + + <li> + + + + + + + + +<label for="section-07b85e4b2f61b1526bf202c64460abcd" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="https://nightlies.apache.org/flink/flink-table-store-docs-stable/" style="color:black" class="">Table Store 0.3 (stable) <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + </li> + + <li> + + + + + + + + +<label for="section-9b9a0032b1e858a34c125d828d1a0718" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="https://nightlies.apache.org/flink/flink-table-store-docs-master/" style="color:black" class="">Table Store Master (snapshot) <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + </li> + + </ul> + + + + + + + + + + + <label for="section-63d6a565d79aa2895f70806a46021c07" class="flex justify-between flink-menu-item"> + <a href="/getting-help/" class="">Getting Help</a> + </label> + + + + + + + + + + + + + + + +<label for="section-1d5066022b83f4732dc80f4e9eaa069a" class="flex justify-between flink-menu-item"> + <a href="https://flink-packages.org/" style="color:black" class="">flink-packages.org <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + + + + <br/> + + + + + + + + + + + <label for="section-7821b78a97db9e919426e86121a7be9c" class="flex justify-between flink-menu-item"> + <a href="/community/" class="">Community & Project Info</a> + </label> + + + + + + + + + + + + + <label for="section-8c042831df4e371c4ef9375f1df06f35" class="flex justify-between flink-menu-item"> + <a href="/roadmap/" class="">Roadmap</a> + </label> + + + + + + + + + + + + <input type="checkbox" id="section-73117efde5302fddcb193307d582b588" class="toggle" /> + <label for="section-73117efde5302fddcb193307d582b588" class="flex justify-between flink-menu-item">How to Contribute<span>▾</span> + </label> + + <ul> + + <li> + + + + + + <label for="section-6646b26b23a3e79b8de9c552ee76f6dd" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="/how-to-contribute/overview/" class="">Overview</a> + </label> + + + </li> + + <li> + + + + + + <label for="section-e6ab9538b82cd5f94103b971adb7c1a9" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="/how-to-contribute/contribute-code/" class="">Contribute Code</a> + </label> + + + </li> + + <li> + + + + + + <label for="section-1c09e1358485e82d9b3f5f689d4ced65" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="/how-to-contribute/reviewing-prs/" class="">Review Pull Requests</a> + </label> + + + </li> + + <li> + + + + + + <label for="section-ed01e0defd235498fa3c9a2a0b3302fb" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="/how-to-contribute/code-style-and-quality-preamble/" class="">Code Style and Quality Guide</a> + </label> + + + </li> + + <li> + + + + + + <label for="section-4e8d5e9924cf15f397711b0d82e15650" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="/how-to-contribute/contribute-documentation/" class="">Contribute Documentation</a> + </label> + + + </li> + + <li> + + + + + + <label for="section-ddaa8307917e5ba7f60ba3316711e492" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="/how-to-contribute/documentation-style-guide/" class="">Documentation Style Guide</a> + </label> + + + </li> + + <li> + + + + + + <label for="section-390a72c171cc82f180a308b95fc3aa72" class="flex justify-between flink-menu-item flink-menu-child"> + <a href="/how-to-contribute/improve-website/" class="">Contribute to the Website</a> + </label> + + + </li> + + </ul> + + + + + + + + + + + <label for="section-9d3ddfd487223d5a199ba301f25c88c6" class="flex justify-between flink-menu-item"> + <a href="/security/" class="">Security</a> + </label> + + + + + + <br/> + + + + + + + + + + <label for="section-a07783f405300745807d39eacf150420" class="flex justify-between flink-menu-item"> + <a href="/posts/" class="">Flink Blog</a> + </label> + + + + + + + + + + + + + + + + + + + + + + + + + +<br/> +<hr class="menu-break"> + + +<label for="section-f71a7070dbb7b669824a6441408ded70" class="flex justify-between flink-menu-item"> + <a href="https://github.com/apache/flink" style="color:black" class="">Flink on GitHub <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + +<label for="section-2ccaaab8c67f3105bbf7df75faca8027" class="flex justify-between flink-menu-item"> + <a href="https://twitter.com/apacheflink" style="color:black" class="">@ApacheFlink <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label> + + + +<hr class="menu-break"> +<table> + <tr> + <th colspan="2"> +<label for="section-78c2028200542d78f8c1a8f6b4cbb36b" class="flex justify-between flink-menu-item"> + <a href="https://www.apache.org/" style="color:black" class="">Apache Software Foundation <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label></th> + </tr> + <tr> + <td> +<label for="section-794df3791a8c800841516007427a2aa3" class="flex justify-between flink-menu-item"> + <a href="https://www.apache.org/licenses/" style="color:black" class="">License <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label></td> + <td> +<label for="section-2fae32629d4ef4fc6341f1751b405e45" class="flex justify-between flink-menu-item"> + <a href="https://www.apache.org/security/" style="color:black" class="">Security <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label></td> + </tr> + <tr> + <td> +<label for="section-0584e445d656b83b431227bb80ff0c30" class="flex justify-between flink-menu-item"> + <a href="https://www.apache.org/foundation/sponsorship.html" style="color:black" class="">Donate <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label></td> + <td> +<label for="section-00d06796e489999226fb5bb27fe1b3b2" class="flex justify-between flink-menu-item"> + <a href="https://www.apache.org/foundation/thanks.html" style="color:black" class="">Thanks <i class="link fa fa-external-link title" aria-hidden="true"></i></a> +</label></td> + </tr> +</table> + +<hr class="menu-break"> + + + + + + + + + + + + + +<a href="/zh/" class="flex align-center"> + <i class="fa fa-globe" aria-hidden="true"></i> + 中文版 +</a> + +<script src="/js/track-search-terms.js"></script> + + +</nav> + + + + + <script>(function(){var e=document.querySelector("aside.book-menu nav");addEventListener("beforeunload",function(){localStorage.setItem("menu.scrollTop",e.scrollTop)}),e.scrollTop=localStorage.getItem("menu.scrollTop")})()</script> + + + + </aside> + + <div class="book-page"> + <header class="book-header"> + + <div class="flex align-center justify-between"> + <label for="menu-control"> + <img src="/svg/menu.svg" class="book-icon" alt="Menu" /> + </label> + + <strong>Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API</strong> + + <label for="toc-control"> + + <img src="/svg/toc.svg" class="book-icon" alt="Table of Contents" /> + + </label> +</div> + + + + <aside class="hidden clearfix"> + + + +<nav id="TableOfContents"><h3>On This Page <button class="toc" onclick="collapseToc()"><i class="fa fa-compress" aria-hidden="true"></i></button></h3> + <ul> + <li><a href="#introduction">Introduction</a></li> + <li><a href="#what-is-tpcds">What is TPCDS?</a></li> + <li><a href="#chosen-tpcds-query">Chosen TPCDS query</a></li> + <li><a href="#the-initial-dataset-pipeline">The initial DataSet pipeline</a></li> + <li><a href="#migrating-the-dataset-pipeline-to-a-datastream-pipeline">Migrating the DataSet pipeline to a DataStream pipeline</a> + <ul> + <li><a href="#setting-the-execution-environmenthttpsgithubcomechauchottpcds-benchmark-flinkblob4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3fsrcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval90-l96"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L90-L96">Setting the execution environment</a></a></li> + <li><a href="#using-the-streaming-sources-and-datasets">Using the streaming sources and datasets</a></li> + <li><a href="#migrating-the-join-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3fsrcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval129-l135"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L129-L135">Migrating the join operation</a></a></li> + <li><a href="#migrating-the-group-by-and-reduce-sum-operationshttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval145-l169"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L145-L169">Migrating the group by and reduce (sum) operations</a></a></li> + <li><a href="#migrating-the-order-by-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval171-l211"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L171-L211">Migrating the order by operation</a></a></li> + <li><a href="#migrating-the-limit-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval213-l223"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L213-L223">Migrating the limit operation</a></a></li> + <li><a href="#migrating-the-sink-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval225-l236"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L225-L236">Migrating the sink operation</a></a></li> + </ul> + </li> + <li><a href="#conclusion">Conclusion</a></li> + </ul> +</nav> + + + </aside> + + + </header> + + + + + + + +<article class="markdown"> + <h1> + <a href="/2023/05/09/howto-migrate-a-real-life-batch-pipeline-from-the-dataset-api-to-the-datastream-api/">Howto migrate a real-life batch pipeline from the DataSet API to the DataStream API</a> + </h1> + + May 9, 2023 - + + + + Etienne Chauchot + + <a href="https://twitter.com/echauchot">(@echauchot)</a> + + + + + <p><h2 id="introduction"> + Introduction + <a class="anchor" href="#introduction">#</a> +</h2> +<p>The Flink community has been deprecating the DataSet API since version 1.12 as part of the work on +<a href="https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=158866741">FLIP-131: Consolidate the user-facing Dataflow SDKs/APIs (and deprecate the DataSet API)</a> +. +This blog article illustrates the migration of a real-life batch DataSet pipeline to a batch +DataStream pipeline. +All the code presented in this article is available in +the <a href="https://github.com/echauchot/tpcds-benchmark-flink">tpcds-benchmark-flink repo</a>. +The use case shown here is extracted from a broader work comparing Flink performances of different +APIs +by implementing <a href="https://www.tpc.org/tpcds/">TPCDS</a> queries using these APIs.</p> +<h2 id="what-is-tpcds"> + What is TPCDS? + <a class="anchor" href="#what-is-tpcds">#</a> +</h2> +<p>TPC-DS is a decision support benchmark that models several generally applicable aspects of a +decision support system. The purpose of TPCDS benchmarks is to provide relevant, objective +performance data of Big Data engines to industry users.</p> +<h2 id="chosen-tpcds-query"> + Chosen TPCDS query + <a class="anchor" href="#chosen-tpcds-query">#</a> +</h2> +<p>The chosen query for this article is <strong>Query3</strong> because it contains all the more common analytics +operators (filter, join, aggregation, group by, order by, limit). It represents an analytic query on +store sales. Its SQL code is presented here:</p> +<p><code>SELECT dt.d_year, item.i_brand_id brand_id, item.i_brand brand,SUM(ss_ext_sales_price) sum_agg FROM date_dim dt, store_sales, item WHERE dt.d_date_sk = store_sales.ss_sold_date_sk AND store_sales.ss_item_sk = item.i_item_sk AND item.i_manufact_id = 128 AND dt.d_moy=11 GROUP BY dt.d_year, item.i_brand, item.i_brand_id ORDER BY dt.d_year, sum_agg desc, brand_id LIMIT 100</code></p> +<h2 id="the-initial-dataset-pipeline"> + The initial DataSet pipeline + <a class="anchor" href="#the-initial-dataset-pipeline">#</a> +</h2> +<p>The pipeline we are migrating +is <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDataset.java">this</a> +batch pipeline that implements the above query using the DataSet API +and <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/types/Row.html">Row</a> +as dataset element type.</p> +<h2 id="migrating-the-dataset-pipeline-to-a-datastream-pipeline"> + Migrating the DataSet pipeline to a DataStream pipeline + <a class="anchor" href="#migrating-the-dataset-pipeline-to-a-datastream-pipeline">#</a> +</h2> +<p>Instead of going through all of the code which is +available <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java">here</a> +we will rather focus on some key areas of the migration. The code is based on the latest release +of Flink at the time this article was written: version 1.16.0.</p> +<p>DataStream is a unified API that allows to run pipelines in both batch and streaming modes. To +execute a DataStream pipeline in batch mode, it is not enough to set the execution mode in the Flink +execution environment, it is also needed to migrate some operations. Indeed, the DataStream API +semantics are the ones of a streaming pipeline. The arriving data is thus considered infinite. So, +compared to the DataSet API which operates on finite data, there are adaptations to be made on some +operations.</p> +<h3 id="setting-the-execution-environmenthttpsgithubcomechauchottpcds-benchmark-flinkblob4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3fsrcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval90-l96"> + <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L90-L96">Setting the execution environment</a> + <a class="anchor" href="#setting-the-execution-environmenthttpsgithubcomechauchottpcds-benchmark-flinkblob4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3fsrcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval90-l96">#</a> +</h3> +<p>We start by moving +from <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/ExecutionEnvironment.html">ExecutionEnvironment</a> +to <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.html">StreamExecutionEnvironment</a> +. Then, as the source in this pipeline is bounded, we can use either the default +streaming <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/datastream/execution_mode//">execution mode</a> +or the batch mode. In batch mode the tasks of the job can be separated into stages that can be +executed one after another. In streaming mode all tasks need to be running all the time and records +are sent to downstream tasks as soon as they are available.</p> +<p>Here we keep the default streaming mode that gives good performance on this pipeline and that would +allow to run the same pipeline with no change on an unbounded source.</p> +<h3 id="using-the-streaming-sources-and-datasets"> + Using the streaming sources and datasets + <a class="anchor" href="#using-the-streaming-sources-and-datasets">#</a> +</h3> +<p><strong>Sources</strong>: <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/operators/DataSource.html">DataSource<T></a> +becomes <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/datastream/DataStreamSource.html">DataStreamSource<T></a> +after the call to <em>env.createInput()</em>.</p> +<p><strong>Datasets</strong>: <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/DataSet.html">DataSet<T></a> +are +now <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/datastream/DataStream.html">DataStream<T></a> +and subclasses.</p> +<h3 id="migrating-the-join-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3fsrcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval129-l135"> + <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L129-L135">Migrating the join operation</a> + <a class="anchor" href="#migrating-the-join-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3fsrcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval129-l135">#</a> +</h3> +<p>The DataStream join operator does not yet support aggregations in batch mode ( +see <a href="https://issues.apache.org/jira/browse/FLINK-22587">FLINK-22587</a> for details). Basically, the +problem is with the trigger of the +default <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/windowing/windows/GlobalWindow.html">GlobalWindow</a> +which never fires so the records are never output. We will workaround this problem by applying a +custom <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L246-L280">EndOfStream</a> +window. It is a window assigner that assigns all the records to a +single <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/windowing/windows/TimeWindow.html">TimeWindow</a> +. So, like for the GlobalWindow, all the records are assigned to the same window except that this +window’s trigger is based on the end of the window (which is set to <em>Long.MAX_VALUE</em>). As we are on +a bounded source, at some point the watermark will advance to +INFINITY (Long.MAX_VALUE) and will +thus cross the end of the time window and consequently fire the trigger and output the records.</p> +<p>Now that we have a working triggering, we need to call a standard join with the <em>Row::join</em> +function.</p> +<h3 id="migrating-the-group-by-and-reduce-sum-operationshttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval145-l169"> + <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L145-L169">Migrating the group by and reduce (sum) operations</a> + <a class="anchor" href="#migrating-the-group-by-and-reduce-sum-operationshttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval145-l169">#</a> +</h3> +<p>DataStream API has no +more <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/DataSet.html#groupBy-org.apache.flink.api.java.functions.KeySelector-">groupBy()</a> +method, we now use +the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/datastream/DataStream.html#keyBy-org.apache.flink.api.java.functions.KeySelector-">keyBy()</a> +method. An aggregation downstream will be applied on elements with the same key exactly as +a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/common/functions/GroupReduceFunction.html">GroupReduceFunction</a> +would have done on a DataSet except it will not need to materialize the collection of data. Indeed, the following +operator is a reducer: the summing operation downstream is still done through +a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/functions/ReduceFunction.html">ReduceFunction</a> +but this time the operator reduces the elements incrementally instead of receiving the rows as a +Collection. To make the sum we store in the reduced row the partially aggregated sum. Due to incremental reduce, +we also need to distinguish if we received an already reduced row (in that case, we read the +partially aggregated sum) or a fresh row (in that case we just read the corresponding price field). +Also, please note that, as in the join case, we need to specify windowing for the aggregation.</p> +<h3 id="migrating-the-order-by-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval171-l211"> + <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L171-L211">Migrating the order by operation</a> + <a class="anchor" href="#migrating-the-order-by-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval171-l211">#</a> +</h3> +<p>The sort of the datastream is done by applying +a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.html">KeyedProcessFunction</a> +.</p> +<p>But, as said above, the DataStream semantics are the ones of a streaming pipeline. The arriving data +is thus considered infinite. As such we need to “divide” the data to have output times. For that we +need to set a timer to output the resulting data. We <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L188">set a timer to fire at the end of the EndOfStream window</a> +meaning that the timer will fire at the end of the batch.</p> +<p>To sort the data, we store the incoming rows inside +a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/state/ListState.html">ListState</a> +and sort them at output time, when the timer fires in +the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/streaming/api/functions/KeyedProcessFunction.html#onTimer-long-org.apache.flink.streaming.api.functions.KeyedProcessFunction.OnTimerContext-org.apache.flink.util.Collector-">onTimer()</a> +callback.</p> +<p>Another thing: to be able to use Flink state, we need to key the datastream beforehand, even if +there +is no group by key because Flink state is designed per-key. Thus, we key by a fake static key so +that there is a single state.</p> +<h3 id="migrating-the-limit-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval213-l223"> + <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L213-L223">Migrating the limit operation</a> + <a class="anchor" href="#migrating-the-limit-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval213-l223">#</a> +</h3> +<p>As all the elements of the DataStream were keyed by the same “0” key, they are kept in the same " +group". So we can implement the SQL LIMIT with +a <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/streaming/api/functions/ProcessFunction.html">ProcessFunction</a> +with a counter that will output only the first 100 elements.</p> +<h3 id="migrating-the-sink-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval225-l236"> + <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L225-L236">Migrating the sink operation</a> + <a class="anchor" href="#migrating-the-sink-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval225-l236">#</a> +</h3> +<p>As with sources, there were big changes in sinks with recent versions of Flink. We now use +the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/connector/sink2/Sink.html">Sink interface</a> +that requires +an <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/serialization/Encoder.html">Encoder</a> +. But the resulting code is very similar to the one using the DataSet API. It’s only +that <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/api/java/org/apache/flink/api/common/serialization/Encoder.html#encode-IN-java.io.OutputStream-">Encoder#encode()</a> +method writes bytes +when <a href="https://nightlies.apache.org/flink/flink-docs-release-1.12/api/java/org/apache/flink/api/java/io/TextOutputFormat.TextFormatter.html#format-IN-">TextOutputFormat.TextFormatter#format()</a> +wrote Strings.</p> +<h2 id="conclusion"> + Conclusion + <a class="anchor" href="#conclusion">#</a> +</h2> +<p>As you saw for the migration of the join operation, the new unified DataStream API has some +limitations left in batch mode. In addition, the order by and limit resulting code is quite manual +and requires the help of the Flink state API for the migration. For all these reasons, the Flink +community recommends to use Flink SQL for batch pipelines. It results in much simpler code, good +performance and out-of-the-box analytics capabilities. You could find the equivalent Query3 code +that uses +the <a href="https://nightlies.apache.org/flink/flink-docs-release-1.16/docs/dev/table/overview/">Flink SQL/Table API</a> +in +the <a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkSQLCSV.java">Query3ViaFlinkSQLCSV class</a> +.</p> +</p> +</article> + + + + <footer class="book-footer"> + + + + + + + +<a href="https://cwiki.apache.org/confluence/display/FLINK/Flink+Translation+Specifications">Want to contribute translation?</a> +<br><br> +<a href="//github.com/apache/flink-web/edit/asf-site/docs/content/posts/2023-05-09-howto-migrate-to-datastream.md" style="color:black"><i class="fa fa-edit fa-fw"></i>Edit This Page</a> + + + + + </footer> + + + + <div class="book-comments"> + +</div> + + + + <label for="menu-control" class="hidden book-menu-overlay"></label> + </div> + + + <aside class="book-toc"> + + + +<nav id="TableOfContents"><h3>On This Page <button class="toc" onclick="collapseToc()"><i class="fa fa-compress" aria-hidden="true"></i></button></h3> + <ul> + <li><a href="#introduction">Introduction</a></li> + <li><a href="#what-is-tpcds">What is TPCDS?</a></li> + <li><a href="#chosen-tpcds-query">Chosen TPCDS query</a></li> + <li><a href="#the-initial-dataset-pipeline">The initial DataSet pipeline</a></li> + <li><a href="#migrating-the-dataset-pipeline-to-a-datastream-pipeline">Migrating the DataSet pipeline to a DataStream pipeline</a> + <ul> + <li><a href="#setting-the-execution-environmenthttpsgithubcomechauchottpcds-benchmark-flinkblob4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3fsrcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval90-l96"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L90-L96">Setting the execution environment</a></a></li> + <li><a href="#using-the-streaming-sources-and-datasets">Using the streaming sources and datasets</a></li> + <li><a href="#migrating-the-join-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3fsrcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval129-l135"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/4273a3bc45d6e4fbdd5fa531fe48f85b8d0a9d3f/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L129-L135">Migrating the join operation</a></a></li> + <li><a href="#migrating-the-group-by-and-reduce-sum-operationshttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval145-l169"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L145-L169">Migrating the group by and reduce (sum) operations</a></a></li> + <li><a href="#migrating-the-order-by-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval171-l211"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L171-L211">Migrating the order by operation</a></a></li> + <li><a href="#migrating-the-limit-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval213-l223"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L213-L223">Migrating the limit operation</a></a></li> + <li><a href="#migrating-the-sink-operationhttpsgithubcomechauchottpcds-benchmark-flinkblob9589c7c74e7152badee8400d775b4af7a998e487srcmainjavaorgexampletpcdsflinkquery3viaflinkrowdatastreamjaval225-l236"><a href="https://github.com/echauchot/tpcds-benchmark-flink/blob/9589c7c74e7152badee8400d775b4af7a998e487/src/main/java/org/example/tpcds/flink/Query3ViaFlinkRowDatastream.java#L225-L236">Migrating the sink operation</a></a></li> + </ul> + </li> + <li><a href="#conclusion">Conclusion</a></li> + </ul> +</nav> + + + </aside> + <aside class="expand-toc"> + <button class="toc" onclick="expandToc()"> + <i class="fa fa-expand" aria-hidden="true"></i> + </button> + </aside> + + </main> + + +</body> + +</html> + + + + + + + + + + + +