Re: Beam SQL found limitations

2023-05-26 Thread Kenneth Knowles
Just want to clarify that Beam's concept of windowing is really an
event-time based key, and they are all processed logically simultaneously.
SQL's concept of windowing function is to sort rows and process them
linearly. They are actually totally different. From your queries it seems
you are interested in SQL's windowing functions (aka analytic functions).

I am surprised by the problems with rows, since we have used them
extensively. Hopefully it is not too hard to fix. Same with the UNION ALL
problem.

And for the CROSS JOIN it would be a nice feature to allow in some cases it
seems. Should not be hard.

Thank you for reporting this! If you have time it would be really great to
get each of these reproducible problems into GitHub issues, each.

Kenn

On Fri, May 26, 2023 at 11:06 AM Wiśniowski Piotr <
contact.wisniowskipi...@gmail.com> wrote:

> Hi Alexey,
>
> Thank You for reference to that discussion I do actually have pretty
> similar thoughts on what Beam SQL needs.
>
> Update from my side:
>
> Actually did find a workaround for issue with windowing function on
> stream. It basically boils down to using sliding window to collect and
> aggregate the state. But would need an advice if this is actually a cost
> efficient method (targeting DataFlow runner). The doubt that I have is that
> this sliding window would need to have sliding interval less than 1s and
> size more than a week and be feed with quire frequent data. If I do
> understand this correctly - it would mean each input row would need to be
> duplicated for each window and stored which could be quite significant
> storage cost?
>
> Or actually Beam does not physically duplicate the record but just tracks
> to which windows the record currently belongs?
>
>
> And the real issue that BeamSQL needs at the moment in my opinion is
> fixing bugs.
>
> Some bugs that I found that prevent one from using it and would really
> appreciate fast fix:
>
> - UNNEST ARRAY with a nested ROW (described below, created ticket -
> https://github.com/apache/beam/issues/26911)
>
> - PubSub table provider actually requires all table properties to be there
> (with null in `timestampAttributeKey` it fails) - which essentially does
> not allow one to use pubsub publish timestamp as `timestampAttributeKey`.
>
> - its not possible to cast VARCHAR to BYTES. And BYTES is needed for
> DataStoreV1TableProvider to provide a key for storage. Also consider
> updating `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it
> requires VARCHAR instead of BYTES - its even easier in implementation.
>
> - Any hints on how to implement `FireStoreIOTableProvider`? I am
> considering implementing it and contributing depending on my team decision
> - but would like to get like idea how hard this task is.
>
> Will create tickets for the rest of issues when I will have some spare
> time.
>
> Best regards
>
> Wiśniowski Piotr
>
>
> On 22.05.2023 18:28, Alexey Romanenko wrote:
>
> Hi Piotr,
>
> Thanks for details! I cross-post this to dev@ as well since, I guess,
> people there can provide more insights on this.
>
> A while ago, I faced the similar issues trying to run Beam SQL against
> TPC-DS benchmark.
> We had a discussion around that [1], please, take a look since it can be
> helpful.
>
> [1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b
>
> —
> Alexey
>
> On 18 May 2023, at 11:36, Wiśniowski Piotr
>  
> wrote:
>
> HI,
>
> After experimenting with Beam SQL I did find some limitations. Testing on
> near latest main (precisely `5aad2467469fafd2ed2dd89012bc80c0cd76b168`)
> with Calcite, direct runner and openjdk version "11.0.19". Please let me
> know if some of them are known/ worked on/ have tickets or have estimated
> fix time. I believe most of them are low hanging fruits or just my thinking
> is not right for the problem. If this is the case please guide me to some
> working solution.
>
>  From my perspective it is ok to have a fix just on master - no need to
> wait for release. Priority order:
> - 7. Windowing function on a stream - in detail - How to get previous
> message for a key? setting expiration arbitrary big is ok, but access to
> the previous record must happen fairly quickly not wait for the big window
> to finish and emit the expired keys. Ideally would like to do it in pure
> beam pipeline as saving to some external key/value store and then reading
> this here could potentially result in some race conditions which in I would
> like to avoid, but if its the only option - let it be.
> - 5. single UNION ALL possible
> - 4. UNNEST ARRAY with nested ROW
> - 3. Using * when there is Row type present in the schema
> - 1. `CROSS JOIN` between two unrelated tables is not supported - even if
> one is a static number table
> - 2. ROW construction not supported. It is not possible to nest data
>
> Below queries tat I use to testing this scenarios.
>
> Thank You for looking at this topics!
>
> Best
>
> Wiśniowski Piotr
> ---
> -- 1. `CROSS 

Re: Beam SQL found limitations

2023-05-26 Thread Wiśniowski Piotr

Hi Alexey,

Thank You for reference to that discussion I do actually have pretty 
similar thoughts on what Beam SQL needs.


Update from my side:

Actually did find a workaround for issue with windowing function on 
stream. It basically boils down to using sliding window to collect and 
aggregate the state. But would need an advice if this is actually a cost 
efficient method (targeting DataFlow runner). The doubt that I have is 
that this sliding window would need to have sliding interval less than 
1s and size more than a week and be feed with quire frequent data. If I 
do understand this correctly - it would mean each input row would need 
to be duplicated for each window and stored which could be quite 
significant storage cost?


Or actually Beam does not physically duplicate the record but just 
tracks to which windows the record currently belongs?



And the real issue that BeamSQL needs at the moment in my opinion is 
fixing bugs.


Some bugs that I found that prevent one from using it and would really 
appreciate fast fix:


- UNNEST ARRAY with a nested ROW (described below, created ticket - 
https://github.com/apache/beam/issues/26911)


- PubSub table provider actually requires all table properties to be 
there (with null in `timestampAttributeKey` it fails) - which 
essentially does not allow one to use pubsub publish timestamp as 
`timestampAttributeKey`.


- its not possible to cast VARCHAR to BYTES. And BYTES is needed for 
DataStoreV1TableProvider to provide a key for storage. Also consider 
updating `org.apache.beam.sdk.io.gcp.datastore.RowToEntity` so that it 
requires VARCHAR instead of BYTES - its even easier in implementation.


- Any hints on how to implement `FireStoreIOTableProvider`? I am 
considering implementing it and contributing depending on my team 
decision - but would like to get like idea how hard this task is.


Will create tickets for the rest of issues when I will have some spare time.

Best regards

Wiśniowski Piotr


On 22.05.2023 18:28, Alexey Romanenko wrote:

Hi Piotr,

Thanks for details! I cross-post this to dev@ as well since, I guess, 
people there can provide more insights on this.


A while ago, I faced the similar issues trying to run Beam SQL against 
TPC-DS benchmark.
We had a discussion around that [1], please, take a look since it can 
be helpful.


[1] https://lists.apache.org/thread/tz8h1lycmob5vpkwznvc2g6ol2s6n99b

—
Alexey

On 18 May 2023, at 11:36, Wiśniowski Piotr 
 wrote:


HI,

After experimenting with Beam SQL I did find some limitations. 
Testing on near latest main (precisely 
`5aad2467469fafd2ed2dd89012bc80c0cd76b168`) with Calcite, direct 
runner and openjdk version "11.0.19". Please let me know if some of 
them are known/ worked on/ have tickets or have estimated fix time. I 
believe most of them are low hanging fruits or just my thinking is 
not right for the problem. If this is the case please guide me to 
some working solution.


 From my perspective it is ok to have a fix just on master - no need 
to wait for release. Priority order:
- 7. Windowing function on a stream - in detail - How to get previous 
message for a key? setting expiration arbitrary big is ok, but access 
to the previous record must happen fairly quickly not wait for the 
big window to finish and emit the expired keys. Ideally would like to 
do it in pure beam pipeline as saving to some external key/value 
store and then reading this here could potentially result in some 
race conditions which in I would like to avoid, but if its the only 
option - let it be.

- 5. single UNION ALL possible
- 4. UNNEST ARRAY with nested ROW
- 3. Using * when there is Row type present in the schema
- 1. `CROSS JOIN` between two unrelated tables is not supported - 
even if one is a static number table

- 2. ROW construction not supported. It is not possible to nest data

Below queries tat I use to testing this scenarios.

Thank You for looking at this topics!

Best

Wiśniowski Piotr

---
-- 1. `CROSS JOIN` between two unrelated tables is not supported.
---
-- Only supported is `CROSS JOIN UNNEST` when exploding array from 
same table.

-- It is not possible to number rows
WITHdata_table AS(
SELECT1ASa
),
number_table AS(
SELECT
numbers_exploded ASnumber_item
FROMUNNEST(ARRAY[1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16]) 
ASnumbers_exploded

)
SELECT
data_table.a,
number_table.number_item
FROMdata_table
CROSS JOINnumber_table
;
-- CROSS JOIN, JOIN ON FALSE is not supported!
---
-- 2. ROW construction not supported. It is not possible to nest data
---
SELECTROW(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
SELECT(1,2,'a') ASr; -- java.lang.NoSuchFieldException: EXPR$0
SELECTMAP['field1',1,'field2','a']; -- Parameters must be of the same 
type

SELECTMAP['field1','b','field2','a']; -- null
-- WORKAROUND - manually compose json string,
-- drawback - decomposing might be not supported or would need to be 
al

[Exception] Output timestamps must be no earlier than the timestamp of the current input or timer.

2023-05-26 Thread Mário Costa via user
Hi,

I need to process messages/events from google pubsub, the message is sent
as JSON payload and contains an json attribute say "time" with the
timestamp value of the event.

I need to group the events into 5 minute windows and write them to files,
one file per window.

After I extract the timestamp and set it in the pipeline I get an exception
message:

java.lang.IllegalArgumentException: Cannot output with timestamp
2023-05-25T16:40:00.015Z. Output timestamps must be no earlier than the
timestamp of the current input or timer (2023-05-25T16:40:00.039Z) minus
the allowed skew (0 milliseconds) and no later than
294247-01-10T04:00:54.775Z. See the DoFn#getAllowedTimestampSkew() Javadoc
for details on changing the allowed skew.

Is there a way to solve this problem?

How can I override the timestamp of the event without having this issue ?

Follows an example of code of the pipeline:

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.io.gcp.pubsub.PubsubIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.transforms.ParDo;
import org.apache.beam.sdk.values.PCollection;
import org.joda.time.Instant;
import org.joda.time.format.DateTimeFormat;
import org.joda.time.format.DateTimeFormatter;

public class PubsubTimestampExample {
public static void main(String[] args) {
// Create the pipeline options
PipelineOptions options = PipelineOptionsFactory.create();

// Create the pipeline
Pipeline pipeline = Pipeline.create(options);

// Define the Pub/Sub topic and subscription
String topic = "projects//topics/";

// Read the messages from Pub/Sub with a timestamp attribute
PCollection messages = pipeline
.apply("ReadFromPubsub",
PubsubIO.readStrings().fromTopic(topic)
);

// Process the messages and set the timestamp
PCollection processedMessages = messages
.apply("SetTimestamp", ParDo.of(new SetTimestampFn()));

// Print the processed messages
processedMessages.apply("PrintMessages", ParDo.of(new
PrintMessagesFn()));

// Run the pipeline
pipeline.run();
}

public static class SetTimestampFn extends DoFn {
private static final DateTimeFormatter TIMESTAMP_FORMATTER =
DateTimeFormat.forPattern("-MM-dd'T'HH:mm:ss.SSSZ");

@ProcessElement
public void processElement(ProcessContext c) {
String message = c.element();
String[] parts = message.split(",");  // Assuming message
format: "payload,timestamp"
String payload = parts[0];
String timestampString = parts[1];

// Extract and parse the timestamp from the payload
Instant timestamp = Instant.parse(timestampString,
TIMESTAMP_FORMATTER);

// Set the timestamp for the element
c.outputWithTimestamp(payload, timestamp);
}
}

public static class PrintMessagesFn extends DoFn {
@ProcessElement
public void processElement(ProcessContext c) {
System.out.println(c.element());
}
}
}
--

Mario Costa
Data Analytics Senior Software Developer