Unfortunately this does not solve my use case. Because I want to be able to create and change the various outputs at runtime (the partition keys would be dynamic) and as such the sql/extraction would have to change during execution. Which I did not believe to be supported. I'm also operating at the datastream level (although of course I could move the datastream into sql-land).
Best, Zack On Tue, Nov 30, 2021 at 2:41 AM Francesco Guardiani <france...@ververica.com> wrote: > Hi Zack, > > > I want to customize this job to "explode" the map as column names and > values > > You can do this in a select statement extracting manually the map values > using the map access built-in > <https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/functions/systemfunctions/#collection-functions>, > e.g.: > > SELECT mymap['a'] AS a, mymap['b'] AS b > > > specifically the BucketAssigner and the CheckpointRollingPolicy both > appear to be required to have a bucketId of a String. > > I wonder if what you're looking for is the PARTITIONED BY feature: > > CREATE TABLE MySinkTable ( > ...) PARTITIONED BY (partitionKey1, partitionKey2) > > Does this solves your use case? > > FG > > > On Tue, Nov 30, 2021 at 7:13 AM Zack Loebel <zloe...@gmail.com> wrote: > >> Hey all, >> >> I have a job which writes data that is a similar shape to a location in >> s3. Currently it writes a map of data with each row. I want to customize >> this job to "explode" the map as column names and values, these are >> consistent for a single bucket. Is there any way to do this? Provide a >> custom parquet schema per bucket within a single dynamic sink? >> >> I've started looking at the changes within the main codebase to make this >> feasible. It seems straightforward to provide the bucketId to the >> writerFactory, and the bucketId could be a type containing the relevant >> schema information. >> Although it appears that the BulkFormatBuilder has several spots where >> BucketId appears to be required to be a String: specifically >> the BucketAssigner and the CheckpointRollingPolicy both appear to be >> required to have a bucketId of a String. >> >> I'm curious if this is a change the community would be open to, and or if >> there is another way to accomplish what I'm looking for that I've missed. >> >> Thanks, >> Zack >> >> -- Have a great day! Zack Loebel-Begelman