Yes, it works but I think it has the same problem. It's a lot slower so it took me hours of running it, but by the end the memory usage was high and the CPU about 100% so it seems to be the same problem.
Worth noting perhaps that when I use the DirectRunner I have to turn enforceImmutability off because of https://issues.apache.org/jira/browse/BEAM-1714 On 2019/06/03 17:48:30, Rui Wang <[email protected]> wrote: > Ha sorry I was only reading screenshots but ignored your other comments. So > count fn indeed worked. > > Can I ask if your sql pipeline works on direct runner? > > > -Rui > > On Mon, Jun 3, 2019 at 10:39 AM Rui Wang <[email protected]> wrote: > > > BeamSQL actually only converts SELECT COUNT(*) query to the Java pipeline > > that calls Java's builtin Count[1] transform. > > > > Could you implement your pipeline by Count transform to see whether this > > memory issue still exists? By doing so we could narrow down problem a bit. > > If using Java directly without going through SQL code path and it works, we > > will know that BeamSQL does not generate a working pipeline for yoru SELECT > > COUNT(*) query. > > > > > > [1]: > > https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L54 > > > > On Sat, Jun 1, 2019 at 9:30 AM [email protected] <[email protected]> > > wrote: > > > >> Is using Beam SQL just massively more resource-intensive or is there a > >> bug somewhere here (in my code or elsewhere)? (I'm using Flink runner) > >> > >> Here is my code: > >> https://pastebin.com/nNtc9ZaG > >> > >> Here is the error I get (truncated at the end because it's so long and > >> seemingly repetitive) when I run the SQL transform and my memory/CPU usage > >> skyrockets: > >> https://pastebin.com/mywmkCQi > >> > >> For example, > >> > >> After several early firing triggers, 13-15% CPU, 1.5GB-2GB RAM, > >> everything working fine: > >> > >> `rowStream.apply(Combine.globally(Count.<Row>combineFn()).withoutDefaults()).apply(myPrint());` > >> > >> After a single early firing trigger, CPU usage shoots to 90%+, 4.7GB+ > >> RAM, soon crashes: > >> `rowStream.apply(SqlTransform.query("SELECT COUNT(*) FROM > >> PCOLLECTION")).apply(myPrint());` > >> > >> I can't imagine this is expected behavior but maybe I'm just ignorant of > >> how SQL is implemented. > >> > >> Most of this code is just getting Schema Registry Avro Kafka messages > >> into a Row stream. There have been discussions on the mailing list recently > >> about how to do that. This is the best I could do. If that part is > >> incorrect I'd be glad to know. > >> > >> Any help appreciated. Thank you. > >> > >> >
