Since you are using FlinkRunner, can you try run pipeline with
`--objectReuse=true` if you will see any noticeable difference? If you are
not using the option already off course

On Tue, Jun 4, 2019 at 9:24 PM Rui Wang <ruw...@google.com> wrote:

> Sorry I couldn't be more helpful at this moment. Created a JIRA for this
> issue: https://jira.apache.org/jira/browse/BEAM-7489
>
>
> -Rui
>
> On Mon, Jun 3, 2019 at 8:14 PM dekl...@gmail.com <dekl...@gmail.com>
> wrote:
>
>> Yes, it works but I think it has the same problem. It's a lot slower so
>> it took me hours of running it, but by the end the memory usage was high
>> and the CPU about 100% so it seems to be the same problem.
>>
>> Worth noting perhaps that when I use the DirectRunner I have to turn
>> enforceImmutability off because of
>> https://issues.apache.org/jira/browse/BEAM-1714
>>
>> On 2019/06/03 17:48:30, Rui Wang <ruw...@google.com> wrote:
>> > Ha sorry I was only reading screenshots but ignored your other
>> comments. So
>> > count fn indeed worked.
>> >
>> > Can I ask if your sql pipeline works on direct runner?
>> >
>> >
>> > -Rui
>> >
>> > On Mon, Jun 3, 2019 at 10:39 AM Rui Wang <ruw...@google.com> wrote:
>> >
>> > > BeamSQL actually only converts SELECT COUNT(*) query to the Java
>> pipeline
>> > > that calls Java's builtin Count[1] transform.
>> > >
>> > > Could you implement your pipeline by Count transform to see whether
>> this
>> > > memory issue still exists? By doing so we could narrow down problem a
>> bit.
>> > > If using Java directly without going through SQL code path and it
>> works, we
>> > > will know that BeamSQL does not generate a working pipeline for yoru
>> SELECT
>> > > COUNT(*) query.
>> > >
>> > >
>> > > [1]:
>> > >
>> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/Count.java#L54
>> > >
>> > > On Sat, Jun 1, 2019 at 9:30 AM dekl...@gmail.com <dekl...@gmail.com>
>> > > wrote:
>> > >
>> > >> Is using Beam SQL just massively more resource-intensive or is there
>> a
>> > >> bug somewhere here (in my code or elsewhere)? (I'm using Flink
>> runner)
>> > >>
>> > >> Here is my code:
>> > >> https://pastebin.com/nNtc9ZaG
>> > >>
>> > >> Here is the error I get (truncated at the end because it's so long
>> and
>> > >> seemingly repetitive) when I run the SQL transform and my memory/CPU
>> usage
>> > >> skyrockets:
>> > >> https://pastebin.com/mywmkCQi
>> > >>
>> > >> For example,
>> > >>
>> > >> After several early firing triggers, 13-15% CPU, 1.5GB-2GB RAM,
>> > >> everything working fine:
>> > >>
>> > >>
>> `rowStream.apply(Combine.globally(Count.<Row>combineFn()).withoutDefaults()).apply(myPrint());`
>> > >>
>> > >> After a single early firing trigger, CPU usage shoots to 90%+, 4.7GB+
>> > >> RAM, soon crashes:
>> > >> `rowStream.apply(SqlTransform.query("SELECT COUNT(*) FROM
>> > >> PCOLLECTION")).apply(myPrint());`
>> > >>
>> > >> I can't imagine this is expected behavior but maybe I'm just
>> ignorant of
>> > >> how SQL is implemented.
>> > >>
>> > >> Most of this code is just getting Schema Registry Avro Kafka messages
>> > >> into a Row stream. There have been discussions on the mailing list
>> recently
>> > >> about how to do that. This is the best I could do. If that part is
>> > >> incorrect I'd be glad to know.
>> > >>
>> > >> Any help appreciated. Thank you.
>> > >>
>> > >>
>> >
>>
>

Reply via email to