Hi Dan, I am afraid there is no mechanism to do that purely in the Table API yet. Or I am not aware of one. If the reinterpretAsKeyedStream works for you, you could use this approach and convert a DataStream (with the reinterpretAsKeyedStream applied) to a Table[1] and then continue with the Table API.
On the topic of reinterpretAsKeyedStream, I wanted to stress out one thing. I'd like to bring your attention to this warning: > *WARNING*: The re-interpreted data stream *MUST* already be > pre-partitioned in *EXACTLY* the same way Flink’s keyBy would > partition the data in a shuffle w.r.t. key-group assignment. I think it is not trivial(or even not possible?) to achieve unless both the producer and the consumer are Flink jobs with the same parallelism. Best, Dawid [1] https://ci.apache.org/projects/flink/flink-docs-release-1.11/dev/table/common.html#convert-a-datastream-or-dataset-into-a-table On 16/09/2020 18:22, Dan Hill wrote: > Hi Piotr! Yes, that's what I'm using with DataStream. It works well > in my prototype. > > On Wed, Sep 16, 2020 at 8:58 AM Piotr Nowojski <pnowoj...@apache.org > <mailto:pnowoj...@apache.org>> wrote: > > Hi, > > Have you seen "Reinterpreting a pre-partitioned data stream as > keyed stream" feature? [1] However I'm not sure if and how can it > be integrated with the Table API. Maybe someone more familiar with > the Table API can help with that? > > Piotrek > > [1] > https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/experimental.html#reinterpreting-a-pre-partitioned-data-stream-as-keyed-stream > > śr., 16 wrz 2020 o 05:35 Dan Hill <quietgol...@gmail.com > <mailto:quietgol...@gmail.com>> napisał(a): > > How do I avoid unnecessary reshuffles when using Kafka as > input? My keys in Kafka are ~userId. The first few stages do > joins that are usually (userId, someOtherKeyId). It makes > sense for these joins to stay on the same machine and avoid > unnecessary shuffling. > > What's the best way to avoid unnecessary shuffling when using > Table SQL interface? I see PARTITION BY on TABLE. I'm not > sure how to specify the keys for Kafka. > > > >
signature.asc
Description: OpenPGP digital signature