Parquet schema per bucket in Streaming File Sink

2021-11-29 Thread Zack Loebel
Hey all,

I have a job which writes data that is a similar shape to a location in s3.
Currently it writes a map of data with each row. I want to customize this
job to "explode" the map as column names and values, these are consistent
for a single bucket. Is there any way to do this? Provide a custom parquet
schema per bucket within a single dynamic sink?

I've started looking at the changes within the main codebase to make this
feasible. It seems straightforward to provide the bucketId to the
writerFactory, and the bucketId could be a type containing the relevant
schema information.
Although it appears that the BulkFormatBuilder has several spots where
BucketId appears to be required to be a String: specifically
the BucketAssigner and the CheckpointRollingPolicy both appear to be
required to have a bucketId of a String.

I'm curious if this is a change the community would be open to, and or if
there is another way to accomplish what I'm looking for that I've missed.

Thanks,
Zack


Re: Parquet schema per bucket in Streaming File Sink

2021-11-30 Thread Francesco Guardiani
Hi Zack,

> I want to customize this job to "explode" the map as column names and
values

You can do this in a select statement extracting manually the map values
using the map access built-in
,
e.g.:

SELECT mymap['a'] AS a, mymap['b'] AS b

> specifically the BucketAssigner and the CheckpointRollingPolicy both
appear to be required to have a bucketId of a String.

I wonder if what you're looking for is the PARTITIONED BY feature:

CREATE TABLE MySinkTable (
  ...) PARTITIONED BY (partitionKey1, partitionKey2)

Does this solves your use case?

FG


On Tue, Nov 30, 2021 at 7:13 AM Zack Loebel  wrote:

> Hey all,
>
> I have a job which writes data that is a similar shape to a location in
> s3. Currently it writes a map of data with each row. I want to customize
> this job to "explode" the map as column names and values, these are
> consistent for a single bucket. Is there any way to do this? Provide a
> custom parquet schema per bucket within a single dynamic sink?
>
> I've started looking at the changes within the main codebase to make this
> feasible. It seems straightforward to provide the bucketId to the
> writerFactory, and the bucketId could be a type containing the relevant
> schema information.
> Although it appears that the BulkFormatBuilder has several spots where
> BucketId appears to be required to be a String: specifically
> the BucketAssigner and the CheckpointRollingPolicy both appear to be
> required to have a bucketId of a String.
>
> I'm curious if this is a change the community would be open to, and or if
> there is another way to accomplish what I'm looking for that I've missed.
>
> Thanks,
> Zack
>
>


Re: Parquet schema per bucket in Streaming File Sink

2021-12-03 Thread Zack Loebel
Unfortunately this does not solve my use case. Because I want to be able to
create and change the various outputs at runtime (the partition keys would
be dynamic) and as such the sql/extraction would have to change during
execution.
Which I did not believe to be supported. I'm also operating at the
datastream level (although of course I could move the datastream into
sql-land).

Best,
Zack


On Tue, Nov 30, 2021 at 2:41 AM Francesco Guardiani 
wrote:

> Hi Zack,
>
> > I want to customize this job to "explode" the map as column names and
> values
>
> You can do this in a select statement extracting manually the map values
> using the map access built-in
> ,
> e.g.:
>
> SELECT mymap['a'] AS a, mymap['b'] AS b
>
> > specifically the BucketAssigner and the CheckpointRollingPolicy both
> appear to be required to have a bucketId of a String.
>
> I wonder if what you're looking for is the PARTITIONED BY feature:
>
> CREATE TABLE MySinkTable (
>   ...) PARTITIONED BY (partitionKey1, partitionKey2)
>
> Does this solves your use case?
>
> FG
>
>
> On Tue, Nov 30, 2021 at 7:13 AM Zack Loebel  wrote:
>
>> Hey all,
>>
>> I have a job which writes data that is a similar shape to a location in
>> s3. Currently it writes a map of data with each row. I want to customize
>> this job to "explode" the map as column names and values, these are
>> consistent for a single bucket. Is there any way to do this? Provide a
>> custom parquet schema per bucket within a single dynamic sink?
>>
>> I've started looking at the changes within the main codebase to make this
>> feasible. It seems straightforward to provide the bucketId to the
>> writerFactory, and the bucketId could be a type containing the relevant
>> schema information.
>> Although it appears that the BulkFormatBuilder has several spots where
>> BucketId appears to be required to be a String: specifically
>> the BucketAssigner and the CheckpointRollingPolicy both appear to be
>> required to have a bucketId of a String.
>>
>> I'm curious if this is a change the community would be open to, and or if
>> there is another way to accomplish what I'm looking for that I've missed.
>>
>> Thanks,
>> Zack
>>
>>

-- 
Have a great day!
Zack Loebel-Begelman