Since you are using FlinkRunner, can you try run pipeline with `--objectReuse=true` if you will see any noticeable difference? If you are not using the option already off course
On Tue, Jun 4, 2019 at 9:24 PM Rui Wang <ruw...@google.com> wrote: > Sorry I couldn't be more helpful at this moment. Created a JIRA for this > issue: https://jira.apache.org/jira/browse/BEAM-7489 > > > -Rui > > On Mon, Jun 3, 2019 at 8:14 PM dekl...@gmail.com <dekl...@gmail.com> > wrote: > >> Yes, it works but I think it has the same problem. It's a lot slower so >> it took me hours of running it, but by the end the memory usage was high >> and the CPU about 100% so it seems to be the same problem. >> >> Worth noting perhaps that when I use the DirectRunner I have to turn >> enforceImmutability off because of >> https://issues.apache.org/jira/browse/BEAM-1714 >> >> On 2019/06/03 17:48:30, Rui Wang <ruw...@google.com> wrote: >> > Ha sorry I was only reading screenshots but ignored your other >> comments. So >> > count fn indeed worked. >> > >> > Can I ask if your sql pipeline works on direct runner? >> > >> > >> > -Rui >> > >> > On Mon, Jun 3, 2019 at 10:39 AM Rui Wang <ruw...@google.com> wrote: >> > >> > > BeamSQL actually only converts SELECT COUNT(*) query to the Java >> pipeline >> > > that calls Java's builtin Count[1] transform. >> > > >> > > Could you implement your pipeline by Count transform to see whether >> this >> > > memory issue still exists? By doing so we could narrow down problem a >> bit. >> > > If using Java directly without going through SQL code path and it >> works, we >> > > will know that BeamSQL does not generate a working pipeline for yoru >> SELECT >> > > COUNT(*) query. >> > > >> > > >> > > [1]: >> > > >> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L54 >> > > >> > > On Sat, Jun 1, 2019 at 9:30 AM dekl...@gmail.com <dekl...@gmail.com> >> > > wrote: >> > > >> > >> Is using Beam SQL just massively more resource-intensive or is there >> a >> > >> bug somewhere here (in my code or elsewhere)? (I'm using Flink >> runner) >> > >> >> > >> Here is my code: >> > >> https://pastebin.com/nNtc9ZaG >> > >> >> > >> Here is the error I get (truncated at the end because it's so long >> and >> > >> seemingly repetitive) when I run the SQL transform and my memory/CPU >> usage >> > >> skyrockets: >> > >> https://pastebin.com/mywmkCQi >> > >> >> > >> For example, >> > >> >> > >> After several early firing triggers, 13-15% CPU, 1.5GB-2GB RAM, >> > >> everything working fine: >> > >> >> > >> >> `rowStream.apply(Combine.globally(Count.<Row>combineFn()).withoutDefaults()).apply(myPrint());` >> > >> >> > >> After a single early firing trigger, CPU usage shoots to 90%+, 4.7GB+ >> > >> RAM, soon crashes: >> > >> `rowStream.apply(SqlTransform.query("SELECT COUNT(*) FROM >> > >> PCOLLECTION")).apply(myPrint());` >> > >> >> > >> I can't imagine this is expected behavior but maybe I'm just >> ignorant of >> > >> how SQL is implemented. >> > >> >> > >> Most of this code is just getting Schema Registry Avro Kafka messages >> > >> into a Row stream. There have been discussions on the mailing list >> recently >> > >> about how to do that. This is the best I could do. If that part is >> > >> incorrect I'd be glad to know. >> > >> >> > >> Any help appreciated. Thank you. >> > >> >> > >> >> > >> >