A problem with nexmark build

2021-05-12 Thread Tao Li
Hi Beam community,

I have been following this nexmark doc: 
https://beam.apache.org/documentation/sdks/java/testing/nexmark/

I ran into a problem with “Running query 0 on a Spark cluster with Apache 
Hadoop YARN” section.

I was following the instruction by running “./gradlew 
:sdks:java:testing:nexmark:assemble” command, but did not find the uber jar 
“build/libs/beam-sdks-java-nexmark-2.29.0-spark.jar” that was built locally 
(the nexmark doc is referencing that jar).

Can someone provide some guidance and help? Thanks.




Re: Extremely Slow DirectRunner

2021-05-12 Thread Evan Galpin
Ok gotcha. In my tests, all sdk versions 2.25.0 and higher exhibit slow
behaviour regardless of use_deprecated_reads. Not sure if that points to
something different then.

Thanks,
Evan

On Wed, May 12, 2021 at 18:16 Steve Niemitz  wrote:

> I think it was only broken in 2.29.
>
> On Wed, May 12, 2021 at 5:53 PM Evan Galpin  wrote:
>
>> Ah ok thanks for that. Do you mean use_deprecated_reads is broken
>> specifically in 2.29.0 (regression) or broken in all versions up to and
>> including 2.29.0 (ie never worked)?
>>
>> Thanks,
>> Evan
>>
>> On Wed, May 12, 2021 at 17:12 Steve Niemitz  wrote:
>>
>>> Yeah, sorry my email was confusing.  use_deprecated_reads is broken on
>>> the DirectRunner in 2.29.
>>>
>>> The behavior you describe is exactly the behavior I ran into as well
>>> when reading from pubsub with the new read method.  I believe that soon the
>>> default is being reverted back to the old read method, not using SDFs,
>>> which will fix your performance issue.
>>>
>>> On Wed, May 12, 2021 at 4:40 PM Boyuan Zhang  wrote:
>>>
 Hi Evan,

 It seems like the slow step is not the read that use_deprecated_read
 targets for. Would you like to share your pipeline code if possible?

 On Wed, May 12, 2021 at 1:35 PM Evan Galpin 
 wrote:

> I just tried with v2.29.0 and use_deprecated_read but unfortunately I
> observed slow behavior again. Is it possible that use_deprecated_read is
> broken in 2.29.0 as well?
>
> Thanks,
> Evan
>
> On Wed, May 12, 2021 at 3:21 PM Steve Niemitz 
> wrote:
>
>> oops sorry I was off by 10...I meant 2.29 not 2.19.
>>
>> On Wed, May 12, 2021 at 2:55 PM Evan Galpin 
>> wrote:
>>
>>> Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the
>>> "faster" behavior, as did v2.23.0. But that "fast" behavior stopped at
>>> v2.25.0 (for my use case at least) regardless of use_deprecated_read
>>> setting.
>>>
>>> Thanks,
>>> Evan
>>>
>>>
>>> On Wed, May 12, 2021 at 2:47 PM Steve Niemitz 
>>> wrote:
>>>
 use_deprecated_read was broken in 2.19 on the direct runner and
 didn't do anything. [1]  I don't think the fix is in 2.20 either, but 
 will
 be in 2.21.

 [1] https://github.com/apache/beam/pull/14469

 On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
 wrote:

> I forgot to also mention that in all tests I was setting
> --experiments=use_deprecated_read
>
> Thanks,
> Evan
>
> On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
> wrote:
>
>> Hmm, I think I spoke too soon. I'm still seeing an issue of
>> overall DirectRunner slowness, not just pubsub. I have a pipeline 
>> like so:
>>
>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()
>> |  FileIO.readMatches()  |  Read file contents  |  etc
>>
>> I have temporarily set up a transform between each step to log
>> what's going on and illustrate timing issues.  I ran a series of 
>> tests
>> changing only the SDK version each time since I hadn't noticed this
>> performance issue with 2.19.0 (effectively git-bisect). Before each 
>> test, I
>> seeded the pubsub subscription with the exact same contents.
>>
>> SDK version 2.25.0 (I had a build issue with 2.24.0 that I
>> couldn't seem to resolve) and onward show a significant slowdown.
>>
>> Here is a snippet of logging from v2.25.0:
>>
>> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>> processElement
>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>> May 12, 2021 11:16:59 A.M.
>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>> INFO: Matched 2 files for pattern
>> gs://my-bucket/my-dir/5004728247517184/**
>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>> processElement
>> INFO: Got ReadableFile: my-file1.json
>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>> processElement
>> INFO: Got ReadableFile: my-file2.json
>> May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
>> processElement
>> INFO: Got file contents for document_id my-file1.json
>> *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
>> processElement
>> INFO: Got file contents for document_id my-file2.json
>>
>> Note that end-to-end, these steps took about *13 minutes*. With
>> SDK 2.23.0 and identical user code, the same section of the pipeline 
>> took *2
>> seconds*:
>>
>> *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>> processElement

Re: Extremely Slow DirectRunner

2021-05-12 Thread Steve Niemitz
I think it was only broken in 2.29.

On Wed, May 12, 2021 at 5:53 PM Evan Galpin  wrote:

> Ah ok thanks for that. Do you mean use_deprecated_reads is broken
> specifically in 2.29.0 (regression) or broken in all versions up to and
> including 2.29.0 (ie never worked)?
>
> Thanks,
> Evan
>
> On Wed, May 12, 2021 at 17:12 Steve Niemitz  wrote:
>
>> Yeah, sorry my email was confusing.  use_deprecated_reads is broken on
>> the DirectRunner in 2.29.
>>
>> The behavior you describe is exactly the behavior I ran into as well when
>> reading from pubsub with the new read method.  I believe that soon the
>> default is being reverted back to the old read method, not using SDFs,
>> which will fix your performance issue.
>>
>> On Wed, May 12, 2021 at 4:40 PM Boyuan Zhang  wrote:
>>
>>> Hi Evan,
>>>
>>> It seems like the slow step is not the read that use_deprecated_read
>>> targets for. Would you like to share your pipeline code if possible?
>>>
>>> On Wed, May 12, 2021 at 1:35 PM Evan Galpin 
>>> wrote:
>>>
 I just tried with v2.29.0 and use_deprecated_read but unfortunately I
 observed slow behavior again. Is it possible that use_deprecated_read is
 broken in 2.29.0 as well?

 Thanks,
 Evan

 On Wed, May 12, 2021 at 3:21 PM Steve Niemitz 
 wrote:

> oops sorry I was off by 10...I meant 2.29 not 2.19.
>
> On Wed, May 12, 2021 at 2:55 PM Evan Galpin 
> wrote:
>
>> Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the
>> "faster" behavior, as did v2.23.0. But that "fast" behavior stopped at
>> v2.25.0 (for my use case at least) regardless of use_deprecated_read
>> setting.
>>
>> Thanks,
>> Evan
>>
>>
>> On Wed, May 12, 2021 at 2:47 PM Steve Niemitz 
>> wrote:
>>
>>> use_deprecated_read was broken in 2.19 on the direct runner and
>>> didn't do anything. [1]  I don't think the fix is in 2.20 either, but 
>>> will
>>> be in 2.21.
>>>
>>> [1] https://github.com/apache/beam/pull/14469
>>>
>>> On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
>>> wrote:
>>>
 I forgot to also mention that in all tests I was setting
 --experiments=use_deprecated_read

 Thanks,
 Evan

 On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
 wrote:

> Hmm, I think I spoke too soon. I'm still seeing an issue of
> overall DirectRunner slowness, not just pubsub. I have a pipeline 
> like so:
>
> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()
> |  FileIO.readMatches()  |  Read file contents  |  etc
>
> I have temporarily set up a transform between each step to log
> what's going on and illustrate timing issues.  I ran a series of tests
> changing only the SDK version each time since I hadn't noticed this
> performance issue with 2.19.0 (effectively git-bisect). Before each 
> test, I
> seeded the pubsub subscription with the exact same contents.
>
> SDK version 2.25.0 (I had a build issue with 2.24.0 that I
> couldn't seem to resolve) and onward show a significant slowdown.
>
> Here is a snippet of logging from v2.25.0:
>
> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
> processElement
> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
> May 12, 2021 11:16:59 A.M.
> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
> INFO: Matched 2 files for pattern
> gs://my-bucket/my-dir/5004728247517184/**
> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
> processElement
> INFO: Got ReadableFile: my-file1.json
> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
> processElement
> INFO: Got ReadableFile: my-file2.json
> May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
> processElement
> INFO: Got file contents for document_id my-file1.json
> *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
> processElement
> INFO: Got file contents for document_id my-file2.json
>
> Note that end-to-end, these steps took about *13 minutes*. With
> SDK 2.23.0 and identical user code, the same section of the pipeline 
> took *2
> seconds*:
>
> *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
> processElement
> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
> May 12, 2021 11:03:40 A.M.
> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
> INFO: Matched 2 files for pattern
> gs://my-bucket/my-dir/5004728247517184/**
> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
> processElement
> 

Re: Extremely Slow DirectRunner

2021-05-12 Thread Evan Galpin
Ah ok thanks for that. Do you mean use_deprecated_reads is broken
specifically in 2.29.0 (regression) or broken in all versions up to and
including 2.29.0 (ie never worked)?

Thanks,
Evan

On Wed, May 12, 2021 at 17:12 Steve Niemitz  wrote:

> Yeah, sorry my email was confusing.  use_deprecated_reads is broken on the
> DirectRunner in 2.29.
>
> The behavior you describe is exactly the behavior I ran into as well when
> reading from pubsub with the new read method.  I believe that soon the
> default is being reverted back to the old read method, not using SDFs,
> which will fix your performance issue.
>
> On Wed, May 12, 2021 at 4:40 PM Boyuan Zhang  wrote:
>
>> Hi Evan,
>>
>> It seems like the slow step is not the read that use_deprecated_read
>> targets for. Would you like to share your pipeline code if possible?
>>
>> On Wed, May 12, 2021 at 1:35 PM Evan Galpin 
>> wrote:
>>
>>> I just tried with v2.29.0 and use_deprecated_read but unfortunately I
>>> observed slow behavior again. Is it possible that use_deprecated_read is
>>> broken in 2.29.0 as well?
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Wed, May 12, 2021 at 3:21 PM Steve Niemitz 
>>> wrote:
>>>
 oops sorry I was off by 10...I meant 2.29 not 2.19.

 On Wed, May 12, 2021 at 2:55 PM Evan Galpin 
 wrote:

> Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the "faster"
> behavior, as did v2.23.0. But that "fast" behavior stopped at v2.25.0 (for
> my use case at least) regardless of use_deprecated_read setting.
>
> Thanks,
> Evan
>
>
> On Wed, May 12, 2021 at 2:47 PM Steve Niemitz 
> wrote:
>
>> use_deprecated_read was broken in 2.19 on the direct runner and
>> didn't do anything. [1]  I don't think the fix is in 2.20 either, but 
>> will
>> be in 2.21.
>>
>> [1] https://github.com/apache/beam/pull/14469
>>
>> On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
>> wrote:
>>
>>> I forgot to also mention that in all tests I was setting
>>> --experiments=use_deprecated_read
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
>>> wrote:
>>>
 Hmm, I think I spoke too soon. I'm still seeing an issue of overall
 DirectRunner slowness, not just pubsub. I have a pipeline like so:

 Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
 FileIO.readMatches()  |  Read file contents  |  etc

 I have temporarily set up a transform between each step to log
 what's going on and illustrate timing issues.  I ran a series of tests
 changing only the SDK version each time since I hadn't noticed this
 performance issue with 2.19.0 (effectively git-bisect). Before each 
 test, I
 seeded the pubsub subscription with the exact same contents.

 SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't
 seem to resolve) and onward show a significant slowdown.

 Here is a snippet of logging from v2.25.0:

 *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
 processElement
 INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
 May 12, 2021 11:16:59 A.M.
 org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
 INFO: Matched 2 files for pattern
 gs://my-bucket/my-dir/5004728247517184/**
 May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
 processElement
 INFO: Got ReadableFile: my-file1.json
 May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
 processElement
 INFO: Got ReadableFile: my-file2.json
 May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
 processElement
 INFO: Got file contents for document_id my-file1.json
 *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
 processElement
 INFO: Got file contents for document_id my-file2.json

 Note that end-to-end, these steps took about *13 minutes*. With
 SDK 2.23.0 and identical user code, the same section of the pipeline 
 took *2
 seconds*:

 *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
 processElement
 INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
 May 12, 2021 11:03:40 A.M.
 org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
 INFO: Matched 2 files for pattern
 gs://my-bucket/my-dir/5004728247517184/**
 May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
 processElement
 INFO: Got ReadableFile: my-file1.json
 May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
 processElement
 INFO: Got ReadableFile: my-file2.json
 May 12, 2021 11:03:41 A.M. 

Re: Extremely Slow DirectRunner

2021-05-12 Thread Steve Niemitz
Yeah, sorry my email was confusing.  use_deprecated_reads is broken on the
DirectRunner in 2.29.

The behavior you describe is exactly the behavior I ran into as well when
reading from pubsub with the new read method.  I believe that soon the
default is being reverted back to the old read method, not using SDFs,
which will fix your performance issue.

On Wed, May 12, 2021 at 4:40 PM Boyuan Zhang  wrote:

> Hi Evan,
>
> It seems like the slow step is not the read that use_deprecated_read
> targets for. Would you like to share your pipeline code if possible?
>
> On Wed, May 12, 2021 at 1:35 PM Evan Galpin  wrote:
>
>> I just tried with v2.29.0 and use_deprecated_read but unfortunately I
>> observed slow behavior again. Is it possible that use_deprecated_read is
>> broken in 2.29.0 as well?
>>
>> Thanks,
>> Evan
>>
>> On Wed, May 12, 2021 at 3:21 PM Steve Niemitz 
>> wrote:
>>
>>> oops sorry I was off by 10...I meant 2.29 not 2.19.
>>>
>>> On Wed, May 12, 2021 at 2:55 PM Evan Galpin 
>>> wrote:
>>>
 Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the "faster"
 behavior, as did v2.23.0. But that "fast" behavior stopped at v2.25.0 (for
 my use case at least) regardless of use_deprecated_read setting.

 Thanks,
 Evan


 On Wed, May 12, 2021 at 2:47 PM Steve Niemitz 
 wrote:

> use_deprecated_read was broken in 2.19 on the direct runner and didn't
> do anything. [1]  I don't think the fix is in 2.20 either, but will be in
> 2.21.
>
> [1] https://github.com/apache/beam/pull/14469
>
> On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
> wrote:
>
>> I forgot to also mention that in all tests I was setting
>> --experiments=use_deprecated_read
>>
>> Thanks,
>> Evan
>>
>> On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
>> wrote:
>>
>>> Hmm, I think I spoke too soon. I'm still seeing an issue of overall
>>> DirectRunner slowness, not just pubsub. I have a pipeline like so:
>>>
>>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
>>> FileIO.readMatches()  |  Read file contents  |  etc
>>>
>>> I have temporarily set up a transform between each step to log
>>> what's going on and illustrate timing issues.  I ran a series of tests
>>> changing only the SDK version each time since I hadn't noticed this
>>> performance issue with 2.19.0 (effectively git-bisect). Before each 
>>> test, I
>>> seeded the pubsub subscription with the exact same contents.
>>>
>>> SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't
>>> seem to resolve) and onward show a significant slowdown.
>>>
>>> Here is a snippet of logging from v2.25.0:
>>>
>>> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>> processElement
>>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>>> May 12, 2021 11:16:59 A.M.
>>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>>> INFO: Matched 2 files for pattern
>>> gs://my-bucket/my-dir/5004728247517184/**
>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>> processElement
>>> INFO: Got ReadableFile: my-file1.json
>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>> processElement
>>> INFO: Got ReadableFile: my-file2.json
>>> May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
>>> processElement
>>> INFO: Got file contents for document_id my-file1.json
>>> *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
>>> processElement
>>> INFO: Got file contents for document_id my-file2.json
>>>
>>> Note that end-to-end, these steps took about *13 minutes*. With SDK
>>> 2.23.0 and identical user code, the same section of the pipeline took *2
>>> seconds*:
>>>
>>> *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>> processElement
>>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>>> May 12, 2021 11:03:40 A.M.
>>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>>> INFO: Matched 2 files for pattern
>>> gs://my-bucket/my-dir/5004728247517184/**
>>> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>> processElement
>>> INFO: Got ReadableFile: my-file1.json
>>> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>> processElement
>>> INFO: Got ReadableFile: my-file2.json
>>> May 12, 2021 11:03:41 A.M. com.myOrg.myPipeline.PipelineLeg$4
>>> processElement
>>> INFO: Got file contents for document_id my-file1.json
>>> *May 12, 2021 11:03:41 A.M.* com.myOrg.myPipeline.PipelineLeg$4
>>> processElement
>>> INFO: Got file contents for document_id my-file2.json
>>>
>>> Any thoughts on what could be causing this?
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Wed, May 12, 2021 at 

Re: Extremely Slow DirectRunner

2021-05-12 Thread Evan Galpin
I'd be happy to share what I can. The applicable portion is this `expand`
method of a PTransform (which does nothing more complex than group these
other transforms together for re-use). The input to this PTransform is
pubsub message bodies as strings. I'll paste it as plain-text.

@Override
public PCollection> expand(PCollection
input) {
return input
.apply(
"Convert OCNs to wildcard paths",
ParDo.of(new BlobPathToRootPath()))

.apply(FileIO.matchAll().withEmptyMatchTreatment(EmptyMatchTreatment.ALLOW))

.apply(FileIO.readMatches().withCompression(Compression.UNCOMPRESSED))
.apply(
MapElements
.into(kvs(strings(), strings()))
.via(
(ReadableFile f) -> {
try {
String[] blobUri =
f.getMetadata().resourceId().toString().split("/");
String doc_id = blobUri[4];
return KV.of(doc_id,
f.readFullyAsUTF8String());
} catch (IOException ex) {
throw new RuntimeException(
"Failed to read the file", ex);
}
}));
}

Thanks,
Evan

On Wed, May 12, 2021 at 4:40 PM Boyuan Zhang  wrote:

> Hi Evan,
>
> It seems like the slow step is not the read that use_deprecated_read
> targets for. Would you like to share your pipeline code if possible?
>
> On Wed, May 12, 2021 at 1:35 PM Evan Galpin  wrote:
>
>> I just tried with v2.29.0 and use_deprecated_read but unfortunately I
>> observed slow behavior again. Is it possible that use_deprecated_read is
>> broken in 2.29.0 as well?
>>
>> Thanks,
>> Evan
>>
>> On Wed, May 12, 2021 at 3:21 PM Steve Niemitz 
>> wrote:
>>
>>> oops sorry I was off by 10...I meant 2.29 not 2.19.
>>>
>>> On Wed, May 12, 2021 at 2:55 PM Evan Galpin 
>>> wrote:
>>>
 Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the "faster"
 behavior, as did v2.23.0. But that "fast" behavior stopped at v2.25.0 (for
 my use case at least) regardless of use_deprecated_read setting.

 Thanks,
 Evan


 On Wed, May 12, 2021 at 2:47 PM Steve Niemitz 
 wrote:

> use_deprecated_read was broken in 2.19 on the direct runner and didn't
> do anything. [1]  I don't think the fix is in 2.20 either, but will be in
> 2.21.
>
> [1] https://github.com/apache/beam/pull/14469
>
> On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
> wrote:
>
>> I forgot to also mention that in all tests I was setting
>> --experiments=use_deprecated_read
>>
>> Thanks,
>> Evan
>>
>> On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
>> wrote:
>>
>>> Hmm, I think I spoke too soon. I'm still seeing an issue of overall
>>> DirectRunner slowness, not just pubsub. I have a pipeline like so:
>>>
>>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
>>> FileIO.readMatches()  |  Read file contents  |  etc
>>>
>>> I have temporarily set up a transform between each step to log
>>> what's going on and illustrate timing issues.  I ran a series of tests
>>> changing only the SDK version each time since I hadn't noticed this
>>> performance issue with 2.19.0 (effectively git-bisect). Before each 
>>> test, I
>>> seeded the pubsub subscription with the exact same contents.
>>>
>>> SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't
>>> seem to resolve) and onward show a significant slowdown.
>>>
>>> Here is a snippet of logging from v2.25.0:
>>>
>>> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>> processElement
>>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>>> May 12, 2021 11:16:59 A.M.
>>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>>> INFO: Matched 2 files for pattern
>>> gs://my-bucket/my-dir/5004728247517184/**
>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>> processElement
>>> INFO: Got ReadableFile: my-file1.json
>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>> processElement
>>> INFO: Got ReadableFile: my-file2.json
>>> May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
>>> processElement
>>> INFO: Got file contents for document_id my-file1.json
>>> *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
>>> processElement
>>> INFO: Got file contents for document_id my-file2.json
>>>
>>> Note that end-to-end, these steps took about *13 minutes*. With SDK
>>> 2.23.0 and identical user code, the same section of the pipeline took *2
>>> seconds*:
>>>
>>> *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>> 

Re: Extremely Slow DirectRunner

2021-05-12 Thread Boyuan Zhang
Hi Evan,

It seems like the slow step is not the read that use_deprecated_read
targets for. Would you like to share your pipeline code if possible?

On Wed, May 12, 2021 at 1:35 PM Evan Galpin  wrote:

> I just tried with v2.29.0 and use_deprecated_read but unfortunately I
> observed slow behavior again. Is it possible that use_deprecated_read is
> broken in 2.29.0 as well?
>
> Thanks,
> Evan
>
> On Wed, May 12, 2021 at 3:21 PM Steve Niemitz  wrote:
>
>> oops sorry I was off by 10...I meant 2.29 not 2.19.
>>
>> On Wed, May 12, 2021 at 2:55 PM Evan Galpin 
>> wrote:
>>
>>> Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the "faster"
>>> behavior, as did v2.23.0. But that "fast" behavior stopped at v2.25.0 (for
>>> my use case at least) regardless of use_deprecated_read setting.
>>>
>>> Thanks,
>>> Evan
>>>
>>>
>>> On Wed, May 12, 2021 at 2:47 PM Steve Niemitz 
>>> wrote:
>>>
 use_deprecated_read was broken in 2.19 on the direct runner and didn't
 do anything. [1]  I don't think the fix is in 2.20 either, but will be in
 2.21.

 [1] https://github.com/apache/beam/pull/14469

 On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
 wrote:

> I forgot to also mention that in all tests I was setting
> --experiments=use_deprecated_read
>
> Thanks,
> Evan
>
> On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
> wrote:
>
>> Hmm, I think I spoke too soon. I'm still seeing an issue of overall
>> DirectRunner slowness, not just pubsub. I have a pipeline like so:
>>
>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
>> FileIO.readMatches()  |  Read file contents  |  etc
>>
>> I have temporarily set up a transform between each step to log what's
>> going on and illustrate timing issues.  I ran a series of tests changing
>> only the SDK version each time since I hadn't noticed this performance
>> issue with 2.19.0 (effectively git-bisect). Before each test, I seeded 
>> the
>> pubsub subscription with the exact same contents.
>>
>> SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't
>> seem to resolve) and onward show a significant slowdown.
>>
>> Here is a snippet of logging from v2.25.0:
>>
>> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>> processElement
>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>> May 12, 2021 11:16:59 A.M.
>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>> INFO: Matched 2 files for pattern
>> gs://my-bucket/my-dir/5004728247517184/**
>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>> processElement
>> INFO: Got ReadableFile: my-file1.json
>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>> processElement
>> INFO: Got ReadableFile: my-file2.json
>> May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
>> processElement
>> INFO: Got file contents for document_id my-file1.json
>> *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
>> processElement
>> INFO: Got file contents for document_id my-file2.json
>>
>> Note that end-to-end, these steps took about *13 minutes*. With SDK
>> 2.23.0 and identical user code, the same section of the pipeline took *2
>> seconds*:
>>
>> *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>> processElement
>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>> May 12, 2021 11:03:40 A.M.
>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>> INFO: Matched 2 files for pattern
>> gs://my-bucket/my-dir/5004728247517184/**
>> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
>> processElement
>> INFO: Got ReadableFile: my-file1.json
>> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
>> processElement
>> INFO: Got ReadableFile: my-file2.json
>> May 12, 2021 11:03:41 A.M. com.myOrg.myPipeline.PipelineLeg$4
>> processElement
>> INFO: Got file contents for document_id my-file1.json
>> *May 12, 2021 11:03:41 A.M.* com.myOrg.myPipeline.PipelineLeg$4
>> processElement
>> INFO: Got file contents for document_id my-file2.json
>>
>> Any thoughts on what could be causing this?
>>
>> Thanks,
>> Evan
>>
>> On Wed, May 12, 2021 at 9:53 AM Evan Galpin 
>> wrote:
>>
>>>
>>>
>>> On Mon, May 10, 2021 at 2:09 PM Boyuan Zhang 
>>> wrote:
>>>
 Hi Evan,

 What do you mean startup delay? Is it the time that from you start
 the pipeline to the time that you notice the first output record from
 PubSub?

>>>
>>> Yes that's what I meant, the seemingly idle system waiting for
>>> pubsub output despite data being in the subscription at pipeline start
>>> time.
>>>
>>> On 

Re: Extremely Slow DirectRunner

2021-05-12 Thread Evan Galpin
I just tried with v2.29.0 and use_deprecated_read but unfortunately I
observed slow behavior again. Is it possible that use_deprecated_read is
broken in 2.29.0 as well?

Thanks,
Evan

On Wed, May 12, 2021 at 3:21 PM Steve Niemitz  wrote:

> oops sorry I was off by 10...I meant 2.29 not 2.19.
>
> On Wed, May 12, 2021 at 2:55 PM Evan Galpin  wrote:
>
>> Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the "faster"
>> behavior, as did v2.23.0. But that "fast" behavior stopped at v2.25.0 (for
>> my use case at least) regardless of use_deprecated_read setting.
>>
>> Thanks,
>> Evan
>>
>>
>> On Wed, May 12, 2021 at 2:47 PM Steve Niemitz 
>> wrote:
>>
>>> use_deprecated_read was broken in 2.19 on the direct runner and didn't
>>> do anything. [1]  I don't think the fix is in 2.20 either, but will be in
>>> 2.21.
>>>
>>> [1] https://github.com/apache/beam/pull/14469
>>>
>>> On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
>>> wrote:
>>>
 I forgot to also mention that in all tests I was setting
 --experiments=use_deprecated_read

 Thanks,
 Evan

 On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
 wrote:

> Hmm, I think I spoke too soon. I'm still seeing an issue of overall
> DirectRunner slowness, not just pubsub. I have a pipeline like so:
>
> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
> FileIO.readMatches()  |  Read file contents  |  etc
>
> I have temporarily set up a transform between each step to log what's
> going on and illustrate timing issues.  I ran a series of tests changing
> only the SDK version each time since I hadn't noticed this performance
> issue with 2.19.0 (effectively git-bisect). Before each test, I seeded the
> pubsub subscription with the exact same contents.
>
> SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't
> seem to resolve) and onward show a significant slowdown.
>
> Here is a snippet of logging from v2.25.0:
>
> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
> processElement
> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
> May 12, 2021 11:16:59 A.M.
> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
> INFO: Matched 2 files for pattern
> gs://my-bucket/my-dir/5004728247517184/**
> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
> processElement
> INFO: Got ReadableFile: my-file1.json
> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
> processElement
> INFO: Got ReadableFile: my-file2.json
> May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
> processElement
> INFO: Got file contents for document_id my-file1.json
> *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
> processElement
> INFO: Got file contents for document_id my-file2.json
>
> Note that end-to-end, these steps took about *13 minutes*. With SDK
> 2.23.0 and identical user code, the same section of the pipeline took *2
> seconds*:
>
> *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
> processElement
> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
> May 12, 2021 11:03:40 A.M.
> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
> INFO: Matched 2 files for pattern
> gs://my-bucket/my-dir/5004728247517184/**
> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
> processElement
> INFO: Got ReadableFile: my-file1.json
> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
> processElement
> INFO: Got ReadableFile: my-file2.json
> May 12, 2021 11:03:41 A.M. com.myOrg.myPipeline.PipelineLeg$4
> processElement
> INFO: Got file contents for document_id my-file1.json
> *May 12, 2021 11:03:41 A.M.* com.myOrg.myPipeline.PipelineLeg$4
> processElement
> INFO: Got file contents for document_id my-file2.json
>
> Any thoughts on what could be causing this?
>
> Thanks,
> Evan
>
> On Wed, May 12, 2021 at 9:53 AM Evan Galpin 
> wrote:
>
>>
>>
>> On Mon, May 10, 2021 at 2:09 PM Boyuan Zhang 
>> wrote:
>>
>>> Hi Evan,
>>>
>>> What do you mean startup delay? Is it the time that from you start
>>> the pipeline to the time that you notice the first output record from
>>> PubSub?
>>>
>>
>> Yes that's what I meant, the seemingly idle system waiting for pubsub
>> output despite data being in the subscription at pipeline start time.
>>
>> On Sat, May 8, 2021 at 12:50 AM Ismaël Mejía 
>>> wrote:
>>>
 Can you try running direct runner with the option
 `--experiments=use_deprecated_read`

>>>
>> This seems to work for me, thanks for this! 
>>
>>
 Seems like an instance of
 

Re: Extremely Slow DirectRunner

2021-05-12 Thread Steve Niemitz
oops sorry I was off by 10...I meant 2.29 not 2.19.

On Wed, May 12, 2021 at 2:55 PM Evan Galpin  wrote:

> Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the "faster"
> behavior, as did v2.23.0. But that "fast" behavior stopped at v2.25.0 (for
> my use case at least) regardless of use_deprecated_read setting.
>
> Thanks,
> Evan
>
>
> On Wed, May 12, 2021 at 2:47 PM Steve Niemitz  wrote:
>
>> use_deprecated_read was broken in 2.19 on the direct runner and didn't do
>> anything. [1]  I don't think the fix is in 2.20 either, but will be in 2.21.
>>
>> [1] https://github.com/apache/beam/pull/14469
>>
>> On Wed, May 12, 2021 at 1:41 PM Evan Galpin 
>> wrote:
>>
>>> I forgot to also mention that in all tests I was setting
>>> --experiments=use_deprecated_read
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
>>> wrote:
>>>
 Hmm, I think I spoke too soon. I'm still seeing an issue of overall
 DirectRunner slowness, not just pubsub. I have a pipeline like so:

 Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
 FileIO.readMatches()  |  Read file contents  |  etc

 I have temporarily set up a transform between each step to log what's
 going on and illustrate timing issues.  I ran a series of tests changing
 only the SDK version each time since I hadn't noticed this performance
 issue with 2.19.0 (effectively git-bisect). Before each test, I seeded the
 pubsub subscription with the exact same contents.

 SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't
 seem to resolve) and onward show a significant slowdown.

 Here is a snippet of logging from v2.25.0:

 *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
 processElement
 INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
 May 12, 2021 11:16:59 A.M.
 org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
 INFO: Matched 2 files for pattern
 gs://my-bucket/my-dir/5004728247517184/**
 May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
 processElement
 INFO: Got ReadableFile: my-file1.json
 May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
 processElement
 INFO: Got ReadableFile: my-file2.json
 May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
 processElement
 INFO: Got file contents for document_id my-file1.json
 *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
 processElement
 INFO: Got file contents for document_id my-file2.json

 Note that end-to-end, these steps took about *13 minutes*. With SDK
 2.23.0 and identical user code, the same section of the pipeline took *2
 seconds*:

 *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
 processElement
 INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
 May 12, 2021 11:03:40 A.M.
 org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
 INFO: Matched 2 files for pattern
 gs://my-bucket/my-dir/5004728247517184/**
 May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
 processElement
 INFO: Got ReadableFile: my-file1.json
 May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
 processElement
 INFO: Got ReadableFile: my-file2.json
 May 12, 2021 11:03:41 A.M. com.myOrg.myPipeline.PipelineLeg$4
 processElement
 INFO: Got file contents for document_id my-file1.json
 *May 12, 2021 11:03:41 A.M.* com.myOrg.myPipeline.PipelineLeg$4
 processElement
 INFO: Got file contents for document_id my-file2.json

 Any thoughts on what could be causing this?

 Thanks,
 Evan

 On Wed, May 12, 2021 at 9:53 AM Evan Galpin 
 wrote:

>
>
> On Mon, May 10, 2021 at 2:09 PM Boyuan Zhang 
> wrote:
>
>> Hi Evan,
>>
>> What do you mean startup delay? Is it the time that from you start
>> the pipeline to the time that you notice the first output record from
>> PubSub?
>>
>
> Yes that's what I meant, the seemingly idle system waiting for pubsub
> output despite data being in the subscription at pipeline start time.
>
> On Sat, May 8, 2021 at 12:50 AM Ismaël Mejía 
>> wrote:
>>
>>> Can you try running direct runner with the option
>>> `--experiments=use_deprecated_read`
>>>
>>
> This seems to work for me, thanks for this! 
>
>
>>> Seems like an instance of
>>> https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17316858=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17316858
>>> also reported in
>>> https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E
>>>
>>> We should rollback using the SDF wrapper by default because of the
>>> usability and performance 

Re: Extremely Slow DirectRunner

2021-05-12 Thread Evan Galpin
Thanks for the link/info. v2.19.0 and v2.21.0 did exhibit the "faster"
behavior, as did v2.23.0. But that "fast" behavior stopped at v2.25.0 (for
my use case at least) regardless of use_deprecated_read setting.

Thanks,
Evan


On Wed, May 12, 2021 at 2:47 PM Steve Niemitz  wrote:

> use_deprecated_read was broken in 2.19 on the direct runner and didn't do
> anything. [1]  I don't think the fix is in 2.20 either, but will be in 2.21.
>
> [1] https://github.com/apache/beam/pull/14469
>
> On Wed, May 12, 2021 at 1:41 PM Evan Galpin  wrote:
>
>> I forgot to also mention that in all tests I was setting
>> --experiments=use_deprecated_read
>>
>> Thanks,
>> Evan
>>
>> On Wed, May 12, 2021 at 1:39 PM Evan Galpin 
>> wrote:
>>
>>> Hmm, I think I spoke too soon. I'm still seeing an issue of overall
>>> DirectRunner slowness, not just pubsub. I have a pipeline like so:
>>>
>>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
>>> FileIO.readMatches()  |  Read file contents  |  etc
>>>
>>> I have temporarily set up a transform between each step to log what's
>>> going on and illustrate timing issues.  I ran a series of tests changing
>>> only the SDK version each time since I hadn't noticed this performance
>>> issue with 2.19.0 (effectively git-bisect). Before each test, I seeded the
>>> pubsub subscription with the exact same contents.
>>>
>>> SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't seem
>>> to resolve) and onward show a significant slowdown.
>>>
>>> Here is a snippet of logging from v2.25.0:
>>>
>>> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>> processElement
>>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>>> May 12, 2021 11:16:59 A.M.
>>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>>> INFO: Matched 2 files for pattern
>>> gs://my-bucket/my-dir/5004728247517184/**
>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>> processElement
>>> INFO: Got ReadableFile: my-file1.json
>>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>> processElement
>>> INFO: Got ReadableFile: my-file2.json
>>> May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
>>> processElement
>>> INFO: Got file contents for document_id my-file1.json
>>> *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
>>> processElement
>>> INFO: Got file contents for document_id my-file2.json
>>>
>>> Note that end-to-end, these steps took about *13 minutes*. With SDK
>>> 2.23.0 and identical user code, the same section of the pipeline took *2
>>> seconds*:
>>>
>>> *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>>> processElement
>>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>>> May 12, 2021 11:03:40 A.M.
>>> org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn process
>>> INFO: Matched 2 files for pattern
>>> gs://my-bucket/my-dir/5004728247517184/**
>>> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>> processElement
>>> INFO: Got ReadableFile: my-file1.json
>>> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
>>> processElement
>>> INFO: Got ReadableFile: my-file2.json
>>> May 12, 2021 11:03:41 A.M. com.myOrg.myPipeline.PipelineLeg$4
>>> processElement
>>> INFO: Got file contents for document_id my-file1.json
>>> *May 12, 2021 11:03:41 A.M.* com.myOrg.myPipeline.PipelineLeg$4
>>> processElement
>>> INFO: Got file contents for document_id my-file2.json
>>>
>>> Any thoughts on what could be causing this?
>>>
>>> Thanks,
>>> Evan
>>>
>>> On Wed, May 12, 2021 at 9:53 AM Evan Galpin 
>>> wrote:
>>>


 On Mon, May 10, 2021 at 2:09 PM Boyuan Zhang 
 wrote:

> Hi Evan,
>
> What do you mean startup delay? Is it the time that from you start the
> pipeline to the time that you notice the first output record from PubSub?
>

 Yes that's what I meant, the seemingly idle system waiting for pubsub
 output despite data being in the subscription at pipeline start time.

 On Sat, May 8, 2021 at 12:50 AM Ismaël Mejía  wrote:
>
>> Can you try running direct runner with the option
>> `--experiments=use_deprecated_read`
>>
>
 This seems to work for me, thanks for this! 


>> Seems like an instance of
>> https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17316858=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17316858
>> also reported in
>> https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E
>>
>> We should rollback using the SDF wrapper by default because of the
>> usability and performance issues reported.
>>
>>
>> On Sat, May 8, 2021 at 12:57 AM Evan Galpin 
>> wrote:
>>
>>> Hi all,
>>>
>>> I’m experiencing very slow performance and startup delay when
>>> testing a pipeline locally. I’m reading data 

Re: Extremely Slow DirectRunner

2021-05-12 Thread Steve Niemitz
use_deprecated_read was broken in 2.19 on the direct runner and didn't do
anything. [1]  I don't think the fix is in 2.20 either, but will be in 2.21.

[1] https://github.com/apache/beam/pull/14469

On Wed, May 12, 2021 at 1:41 PM Evan Galpin  wrote:

> I forgot to also mention that in all tests I was setting
> --experiments=use_deprecated_read
>
> Thanks,
> Evan
>
> On Wed, May 12, 2021 at 1:39 PM Evan Galpin  wrote:
>
>> Hmm, I think I spoke too soon. I'm still seeing an issue of overall
>> DirectRunner slowness, not just pubsub. I have a pipeline like so:
>>
>> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
>> FileIO.readMatches()  |  Read file contents  |  etc
>>
>> I have temporarily set up a transform between each step to log what's
>> going on and illustrate timing issues.  I ran a series of tests changing
>> only the SDK version each time since I hadn't noticed this performance
>> issue with 2.19.0 (effectively git-bisect). Before each test, I seeded the
>> pubsub subscription with the exact same contents.
>>
>> SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't seem
>> to resolve) and onward show a significant slowdown.
>>
>> Here is a snippet of logging from v2.25.0:
>>
>> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>> processElement
>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>> May 12, 2021 11:16:59 A.M. org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn
>> process
>> INFO: Matched 2 files for pattern
>> gs://my-bucket/my-dir/5004728247517184/**
>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>> processElement
>> INFO: Got ReadableFile: my-file1.json
>> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
>> processElement
>> INFO: Got ReadableFile: my-file2.json
>> May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
>> processElement
>> INFO: Got file contents for document_id my-file1.json
>> *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
>> processElement
>> INFO: Got file contents for document_id my-file2.json
>>
>> Note that end-to-end, these steps took about *13 minutes*. With SDK
>> 2.23.0 and identical user code, the same section of the pipeline took *2
>> seconds*:
>>
>> *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
>> processElement
>> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
>> May 12, 2021 11:03:40 A.M. org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn
>> process
>> INFO: Matched 2 files for pattern
>> gs://my-bucket/my-dir/5004728247517184/**
>> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
>> processElement
>> INFO: Got ReadableFile: my-file1.json
>> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
>> processElement
>> INFO: Got ReadableFile: my-file2.json
>> May 12, 2021 11:03:41 A.M. com.myOrg.myPipeline.PipelineLeg$4
>> processElement
>> INFO: Got file contents for document_id my-file1.json
>> *May 12, 2021 11:03:41 A.M.* com.myOrg.myPipeline.PipelineLeg$4
>> processElement
>> INFO: Got file contents for document_id my-file2.json
>>
>> Any thoughts on what could be causing this?
>>
>> Thanks,
>> Evan
>>
>> On Wed, May 12, 2021 at 9:53 AM Evan Galpin 
>> wrote:
>>
>>>
>>>
>>> On Mon, May 10, 2021 at 2:09 PM Boyuan Zhang  wrote:
>>>
 Hi Evan,

 What do you mean startup delay? Is it the time that from you start the
 pipeline to the time that you notice the first output record from PubSub?

>>>
>>> Yes that's what I meant, the seemingly idle system waiting for pubsub
>>> output despite data being in the subscription at pipeline start time.
>>>
>>> On Sat, May 8, 2021 at 12:50 AM Ismaël Mejía  wrote:

> Can you try running direct runner with the option
> `--experiments=use_deprecated_read`
>

>>> This seems to work for me, thanks for this! 
>>>
>>>
> Seems like an instance of
> https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17316858=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17316858
> also reported in
> https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E
>
> We should rollback using the SDF wrapper by default because of the
> usability and performance issues reported.
>
>
> On Sat, May 8, 2021 at 12:57 AM Evan Galpin 
> wrote:
>
>> Hi all,
>>
>> I’m experiencing very slow performance and startup delay when testing
>> a pipeline locally. I’m reading data from a Google PubSub subscription as
>> the data source, and before each pipeline execution I ensure that data is
>> present in the subscription (readable from GCP console).
>>
>> I’m seeing startup delay on the order of minutes with DirectRunner
>> (5-10 min). Is that expected? I did find a Jira ticket[1] that at first
>> seemed related, but I think it has more to do with BQ than 

Re: Beam/Dataflow pipeline backfill via JDBC

2021-05-12 Thread Raman Gupta
The pipeline works fine with the Direct Runner. The issue appears to be
specific to streaming mode on Dataflow. I've updated my pipeline to use
Pub/Sub as an input instead, and digging into the Dataflow console, it
looks like execution of a particular GroupByKey is moving extremely slowly
-- the watermark for the prior step is caught up to real time, but the
GroupByKey step data watermark is currently June 4, 2020 and takes about 10
minutes to advance a single day, which is ridiculously slow. In batch mode,
the whole backlog is processed in about 30 minutes.

It is actually a reasonably large and complex pipeline, so I'm not actually
sure where I would even start with code snippets. The particular GroupByKey
that seems to be running extremely slowly is "FixedTickGroupByTeam" and is
in a part of the pipeline that looks like this -- it really should be
processing only a few events per day with an event time of Jun 4th 2020, so
something is definitely not right:

val scores = timestampedCheckins
  .apply(
"FixedTickWindows",
Window.into(FixedWindows.of(5.minutes.asJoda()))
  // NOTE that we use the default timestamp combiner (end of
window) to avoid
  // https://issues.apache.org/jira/browse/BEAM-2262
  .triggering(
AfterWatermark.pastEndOfWindow()
  .withLateFirings(AfterPane.elementCountAtLeast(1))
  )
  .withAllowedLateness(3.days.asJoda(),
Window.ClosingBehavior.FIRE_IF_NON_EMPTY)
  .withOnTimeBehavior(Window.OnTimeBehavior.FIRE_ALWAYS)
  .withTimestampCombiner(TimestampCombiner.LATEST)
  .discardingFiredPanes()
  )
  .apply("FixedTickKeyByTeam", WithKeys.of { it.teamId })
  .apply("FixedTickGroupByTeam", GroupByKey.create())
  .apply("GlobalWindowsLoopingStatefulTimer",
Window.into(GlobalWindows())
  )
  .apply("LoopingStatefulTimer",
ParDo.of(LoopingStatefulTimer(5.minutes, (options.timerTimeoutDays
?: 30).days))
  )
  // this window and subsequent group by and flatten combines the
empty iterable timer output with any actual check-ins
  .apply("FixedTickWindowsPostTimer",
Window.into>>(FixedWindows.of(5.minutes.asJoda()))
  .applyWindowingOptions()
  )
  .apply("FixedTickGroupByTeamPostTimer", GroupByKey.create())
  .apply("FixedTickFlattenPostTimer", flattenValues())
  // convert to an explicit PeriodContent type representing either an
empty period or a period with check-ins
  // this allows us to carry forward the timestamp of an empty period,
without it being flattened into a single empty
  //.apply("MapToPeriodContent", ParDo.of(MapToPeriodContentFn()))
  .apply("CheckinTimingScoreFn",
ParDo.of(CheckinTimingScoreFn(scoreCalculationServiceBuilder,
checkinStateView)).withSideInputs(checkinStateView)
  )






On Wed, May 12, 2021 at 12:43 PM Kenneth Knowles  wrote:

> Can you share some more details, such as code? We may identify something
> that relies upon assumptions from batch execution style.
>
> Also notably the Java DirectRunner does not have separate batch/streaming
> mode. It always executes in a "streaming" sort of way. It is also simpler
> in some ways so if you can reproduce it on the DirectRunner that might help.
>
> Kenn
>
> On Tue, May 11, 2021 at 3:41 PM Raman Gupta  wrote:
>
>> I have a Dataflow pipeline that reads data from JDBC and Pub/Sub. My
>> ideal pipeline backfills its state and output from historical data via the
>> JDBC input, and then continues processing new elements arriving via
>> pub/sub. Conceptually, this seems easy to do with a filter on each source
>> before/after some specific cutoff instant.
>>
>> However, when I add pub/sub into the pipeline, it runs in streaming mode,
>> and the pipeline does not produce the expected results -- all of the
>> results that would be produced based on looping timers seem to be missing.
>>
>> I thought this might be related to the post-inputs Flatten, but I've
>> taken pub/sub out of the equation, and run the same exact JDBC-based
>> pipeline in batch vs streaming mode, and the JDBC-only pipeline in
>> streaming mode produces the same partial results.
>>
>> What could be happening?
>>
>> Regards,
>> Raman
>>
>>


Re: Extremely Slow DirectRunner

2021-05-12 Thread Evan Galpin
I forgot to also mention that in all tests I was setting
--experiments=use_deprecated_read

Thanks,
Evan

On Wed, May 12, 2021 at 1:39 PM Evan Galpin  wrote:

> Hmm, I think I spoke too soon. I'm still seeing an issue of overall
> DirectRunner slowness, not just pubsub. I have a pipeline like so:
>
> Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
> FileIO.readMatches()  |  Read file contents  |  etc
>
> I have temporarily set up a transform between each step to log what's
> going on and illustrate timing issues.  I ran a series of tests changing
> only the SDK version each time since I hadn't noticed this performance
> issue with 2.19.0 (effectively git-bisect). Before each test, I seeded the
> pubsub subscription with the exact same contents.
>
> SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't seem
> to resolve) and onward show a significant slowdown.
>
> Here is a snippet of logging from v2.25.0:
>
> *May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
> processElement
> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
> May 12, 2021 11:16:59 A.M. org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn
> process
> INFO: Matched 2 files for pattern gs://my-bucket/my-dir/5004728247517184/**
> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
> processElement
> INFO: Got ReadableFile: my-file1.json
> May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3
> processElement
> INFO: Got ReadableFile: my-file2.json
> May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4
> processElement
> INFO: Got file contents for document_id my-file1.json
> *May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
> processElement
> INFO: Got file contents for document_id my-file2.json
>
> Note that end-to-end, these steps took about *13 minutes*. With SDK
> 2.23.0 and identical user code, the same section of the pipeline took *2
> seconds*:
>
> *May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
> processElement
> INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
> May 12, 2021 11:03:40 A.M. org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn
> process
> INFO: Matched 2 files for pattern gs://my-bucket/my-dir/5004728247517184/**
> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
> processElement
> INFO: Got ReadableFile: my-file1.json
> May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3
> processElement
> INFO: Got ReadableFile: my-file2.json
> May 12, 2021 11:03:41 A.M. com.myOrg.myPipeline.PipelineLeg$4
> processElement
> INFO: Got file contents for document_id my-file1.json
> *May 12, 2021 11:03:41 A.M.* com.myOrg.myPipeline.PipelineLeg$4
> processElement
> INFO: Got file contents for document_id my-file2.json
>
> Any thoughts on what could be causing this?
>
> Thanks,
> Evan
>
> On Wed, May 12, 2021 at 9:53 AM Evan Galpin  wrote:
>
>>
>>
>> On Mon, May 10, 2021 at 2:09 PM Boyuan Zhang  wrote:
>>
>>> Hi Evan,
>>>
>>> What do you mean startup delay? Is it the time that from you start the
>>> pipeline to the time that you notice the first output record from PubSub?
>>>
>>
>> Yes that's what I meant, the seemingly idle system waiting for pubsub
>> output despite data being in the subscription at pipeline start time.
>>
>> On Sat, May 8, 2021 at 12:50 AM Ismaël Mejía  wrote:
>>>
 Can you try running direct runner with the option
 `--experiments=use_deprecated_read`

>>>
>> This seems to work for me, thanks for this! 
>>
>>
 Seems like an instance of
 https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17316858=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17316858
 also reported in
 https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E

 We should rollback using the SDF wrapper by default because of the
 usability and performance issues reported.


 On Sat, May 8, 2021 at 12:57 AM Evan Galpin 
 wrote:

> Hi all,
>
> I’m experiencing very slow performance and startup delay when testing
> a pipeline locally. I’m reading data from a Google PubSub subscription as
> the data source, and before each pipeline execution I ensure that data is
> present in the subscription (readable from GCP console).
>
> I’m seeing startup delay on the order of minutes with DirectRunner
> (5-10 min). Is that expected? I did find a Jira ticket[1] that at first
> seemed related, but I think it has more to do with BQ than DirectRunner.
>
> I’ve run the pipeline with a debugger connected and confirmed that
> it’s minutes before the first DoFn in my pipeline receives any data. Is
> there a way I can profile the direct runner to see what it’s churning on?
>
> Thanks,
> Evan
>
> [1]
> https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-4548
>



Re: Extremely Slow DirectRunner

2021-05-12 Thread Evan Galpin
Hmm, I think I spoke too soon. I'm still seeing an issue of overall
DirectRunner slowness, not just pubsub. I have a pipeline like so:

Read pubsub  |  extract GCS glob patterns  |  FileIO.matchAll()  |
FileIO.readMatches()  |  Read file contents  |  etc

I have temporarily set up a transform between each step to log what's going
on and illustrate timing issues.  I ran a series of tests changing only the
SDK version each time since I hadn't noticed this performance issue with
2.19.0 (effectively git-bisect). Before each test, I seeded the pubsub
subscription with the exact same contents.

SDK version 2.25.0 (I had a build issue with 2.24.0 that I couldn't seem to
resolve) and onward show a significant slowdown.

Here is a snippet of logging from v2.25.0:

*May 12, 2021 11:11:52 A.M.* com.myOrg.myPipeline.PipelineLeg$1
processElement
INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
May 12, 2021 11:16:59 A.M. org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn
process
INFO: Matched 2 files for pattern gs://my-bucket/my-dir/5004728247517184/**
May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3 processElement
INFO: Got ReadableFile: my-file1.json
May 12, 2021 11:23:32 A.M. com.myOrg.myPipeline.PipelineLeg$3 processElement
INFO: Got ReadableFile: my-file2.json
May 12, 2021 11:24:35 A.M. com.myOrg.myPipeline.PipelineLeg$4 processElement
INFO: Got file contents for document_id my-file1.json
*May 12, 2021 11:24:35 A.M*. com.myOrg.myPipeline.PipelineLeg$4
processElement
INFO: Got file contents for document_id my-file2.json

Note that end-to-end, these steps took about *13 minutes*. With SDK 2.23.0
and identical user code, the same section of the pipeline took *2 seconds*:

*May 12, 2021 11:03:39 A.M.* com.myOrg.myPipeline.PipelineLeg$1
processElement
INFO: Got file pattern: gs://my-bucket/my-dir/5004728247517184/**
May 12, 2021 11:03:40 A.M. org.apache.beam.sdk.io.FileIO$MatchAll$MatchFn
process
INFO: Matched 2 files for pattern gs://my-bucket/my-dir/5004728247517184/**
May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3 processElement
INFO: Got ReadableFile: my-file1.json
May 12, 2021 11:03:40 A.M. com.myOrg.myPipeline.PipelineLeg$3 processElement
INFO: Got ReadableFile: my-file2.json
May 12, 2021 11:03:41 A.M. com.myOrg.myPipeline.PipelineLeg$4 processElement
INFO: Got file contents for document_id my-file1.json
*May 12, 2021 11:03:41 A.M.* com.myOrg.myPipeline.PipelineLeg$4
processElement
INFO: Got file contents for document_id my-file2.json

Any thoughts on what could be causing this?

Thanks,
Evan

On Wed, May 12, 2021 at 9:53 AM Evan Galpin  wrote:

>
>
> On Mon, May 10, 2021 at 2:09 PM Boyuan Zhang  wrote:
>
>> Hi Evan,
>>
>> What do you mean startup delay? Is it the time that from you start the
>> pipeline to the time that you notice the first output record from PubSub?
>>
>
> Yes that's what I meant, the seemingly idle system waiting for pubsub
> output despite data being in the subscription at pipeline start time.
>
> On Sat, May 8, 2021 at 12:50 AM Ismaël Mejía  wrote:
>>
>>> Can you try running direct runner with the option
>>> `--experiments=use_deprecated_read`
>>>
>>
> This seems to work for me, thanks for this! 
>
>
>>> Seems like an instance of
>>> https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17316858=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17316858
>>> also reported in
>>> https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E
>>>
>>> We should rollback using the SDF wrapper by default because of the
>>> usability and performance issues reported.
>>>
>>>
>>> On Sat, May 8, 2021 at 12:57 AM Evan Galpin 
>>> wrote:
>>>
 Hi all,

 I’m experiencing very slow performance and startup delay when testing a
 pipeline locally. I’m reading data from a Google PubSub subscription as the
 data source, and before each pipeline execution I ensure that data is
 present in the subscription (readable from GCP console).

 I’m seeing startup delay on the order of minutes with DirectRunner
 (5-10 min). Is that expected? I did find a Jira ticket[1] that at first
 seemed related, but I think it has more to do with BQ than DirectRunner.

 I’ve run the pipeline with a debugger connected and confirmed that it’s
 minutes before the first DoFn in my pipeline receives any data. Is there a
 way I can profile the direct runner to see what it’s churning on?

 Thanks,
 Evan

 [1]
 https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-4548

>>>


Re: DirectRunner, Fusion, and Triggers

2021-05-12 Thread Bashir Sadjad
Thanks Kenn.

On Wed, May 12, 2021 at 12:14 PM Kenneth Knowles  wrote:

>
> On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad  wrote:
>
>> However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3) which only
>> prints some log messages for each record and passes the record to output,
>> then it seems S2 and S2' are fused. Because the log messages are
>> interleaved with fetches.
>>
>
>> *Q1*: Does DirectRunner do any fusion optimization (e.g., like
>> DataflowRunner)? If not by default, is there any way to enable it?
>>
>
> The Java DirectRunner does not do any fusion optimization. There's no code
> to enable :-). It should affect performance only, not semantics. The
> DirectRunner is known to have poor performance, but mostly no one is
> working on speeding it up because it is really just for small-scale testing.
>

Here is a minimal pipeline (with no windowing) that demonstrates what I
mean; maybe I am using the wrong terminology but when I run this pipeline
with DirectRunner (and with `--targetParallelism=1`) the `DEBUG INPUT` and
`DEBUG NEXT` messages are interleaved. While if there was no fusion, I
would have expected to see all `DEBUG INPUT` messages first and then all of
`DEBUG NEXT`:

Pipeline pipeline = Pipeline.create(options);
PCollection lines =
pipeline.apply(TextIO.read().from(options.getInputFile()));

PCollection linesDelayed = lines.apply("Sleep", ParDo.of(new
DoFn() {
  @StartBundle
  public void startBundle() {
log.info("INPUT: Started a new bundle");
  }
  @ProcessElement
  public void ProcessElement(@Element String line, OutputReceiver
out) throws InterruptedException {
log.info(String.format("DEBUG INPUT %s", line));
Thread.sleep(3000);
out.output(line);
  }
}));

PCollection linesDebug = linesDelayed.apply("Next", ParDo.of(new
DoFn() {
  @StartBundle
  public void startBundle() {
log.info("NEXT: Started a new bundle");
  }
  @ProcessElement
  public void ProcessElement(@Element String line, OutputReceiver
out) {
log.info(String.format("DEBUG NEXT %s", line));
out.output(line);
  }
}));

linesDebug.apply(TextIO.write().to(options.getOutputFile()).withNumShards(1));

PipelineResult result = pipeline.run();
result.waitUntilFinish();

It seems that a few bundles are processed by `Sleep` transform then they
all go through `Next`. Again a few more bundles go through `Sleep` then
`Next` and so on.


>
>
>
>> The other issue is with triggers and creating panes. I have an extended
>> version of this pipeline where a simplified view of it is:
>> S1->S2A->GBK->S2B->S3
>>
>> S1: Like before
>> S2A: Add a key to the output of S1
>> GBK: Groups output of S2A to remove duplicate keys
>> S2B: Similar to S2 above, i.e., fetch deduped URLs and create Avro records
>> S3: Same as before
>>
>> *Q2*: In this case, if I add a dummy S2B' after S2', the log messages
>> are *not* interleaved with resource fetches, i.e., no fusion is
>> happening. Why? What is different here?
>>
>
> I don't quite understand what the problem is here.
>

The same log message interleaving does not happen in this case. So back to
my original example sketch, log messages of S2' are interleaved with S2
(which I thought is because of fusion) but all of the log messages of S2B'
are printed after all messages of S2B.


>
>
>
>> *Q3*: Even if I add a similar trigger to the output of S2B, the Parquet
>> file generation does not start until all of the fetches are done. Again,
>> what is different here and why intermediate panes are not fired while the
>> output of S2B is being generated?
>>
>
> I think it would help to see how you have configured the ParquetIO write
> transform.
>

I think this is related to the difference between the behaviour of the two
examples above (i.e., S2' vs. S2B'). If it turns out that is not the case,
I will create a minimal example including ParquetIO too.

Thanks again

-B


>
> Kenn
>
>
>>
>> Thanks
>>
>> -B
>> P.S. I need this pipeline to work both on a distributed runner and also
>> on a local machine with many cores. That's why the performance of
>> DirectRunner is important to me.
>>
>


BeamSQL: Error when using WHERE statements with OVER windows

2021-05-12 Thread Burkay Gur
Hi folks,

When we try to run the following query on BeamSQL:

SELECT item, purchases, category, sum(purchases) over (PARTITION BY
category ORDER BY purchases ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT
ROW) as total_purchases  FROM PCOLLECTION WHERE purchases > 3

We are getting the following error:

Unable to convert query
org.apache.beam.sdk.extensions.sql.impl.SqlConversionException: Unable to
convert query SELECT item, purchases, category, sum(purchases) over
(PARTITION BY category ORDER BY purchases ROWS BETWEEN UNBOUNDED PRECEDING
AND CURRENT ROW) as total_purchases  FROM PCOLLECTION WHERE purchases > 3
at
org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner.convertToBeamRel(CalciteQueryPlanner.java:212)
at
org.apache.beam.sdk.extensions.sql.impl.BeamSqlEnv.parseQuery(BeamSqlEnv.java:111)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:171)
at
org.apache.beam.sdk.extensions.sql.SqlTransform.expand(SqlTransform.java:109)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548) at
org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:499) at
org.apache.beam.sdk.values.PCollection.apply(PCollection.java:370) at

We traced the issue back to this PR:
https://github.com/apache/beam/pull/11975 specifically this line:
https://github.com/apache/beam/pull/11975/files#diff-919be1e4bcc11c17b725cbf04168b583886ffe16286f9291893247954128ad81R43

What are the plans on a wider support for analytical functions? If I want
to contribute, what are the best resources to learn more about how BeamSQL
/ Calcite integration is set up?

Best,
Burkay


Re: Beam/Dataflow pipeline backfill via JDBC

2021-05-12 Thread Kenneth Knowles
Can you share some more details, such as code? We may identify something
that relies upon assumptions from batch execution style.

Also notably the Java DirectRunner does not have separate batch/streaming
mode. It always executes in a "streaming" sort of way. It is also simpler
in some ways so if you can reproduce it on the DirectRunner that might help.

Kenn

On Tue, May 11, 2021 at 3:41 PM Raman Gupta  wrote:

> I have a Dataflow pipeline that reads data from JDBC and Pub/Sub. My ideal
> pipeline backfills its state and output from historical data via the JDBC
> input, and then continues processing new elements arriving via pub/sub.
> Conceptually, this seems easy to do with a filter on each source
> before/after some specific cutoff instant.
>
> However, when I add pub/sub into the pipeline, it runs in streaming mode,
> and the pipeline does not produce the expected results -- all of the
> results that would be produced based on looping timers seem to be missing.
>
> I thought this might be related to the post-inputs Flatten, but I've taken
> pub/sub out of the equation, and run the same exact JDBC-based pipeline in
> batch vs streaming mode, and the JDBC-only pipeline in streaming mode
> produces the same partial results.
>
> What could be happening?
>
> Regards,
> Raman
>
>


Re: A problem with calcite sql

2021-05-12 Thread Tao Li
Andrew,

I tried the last query you recommended, and seeing this error:

Caused by: 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.tools.ValidationException:
 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.runtime.CalciteContextException:
 From line 1, column 34 to line 1, column 44: Table 'PCOLLECTION' not found



From: Andrew Pilloud 
Date: Tuesday, May 11, 2021 at 10:38 PM
To: Tao Li 
Cc: "user@beam.apache.org" , Yuan Feng 

Subject: Re: A problem with calcite sql

If the type was just a nested row this should would work:
SELECT `market_transactionManagement_transactionManagers`.`email` FROM 
PCOLLECTION
or this:
SELECT market_transactionManagement_transactionManagers.email FROM PCOLLECTION

If you have exactly one element in the array something like this should work:
SELECT market_transactionManagement_transactionManagers[1].email FROM 
PCOLLECTION

If you want to extract the array, try something like this:
SELECT manager.email FROM 
UNNEST(PCOLLECTION.market_transactionManagement_transactionManagers) AS manager

On Tue, May 11, 2021 at 10:22 PM Tao Li 
mailto:t...@zillow.com>> wrote:
Thanks Andrew. With `id` syntax I am not seeing “Unhandled logical type 
SqlCharType” error any more. This is great progress!

However I am still seeing an issue by querying a composite field. Below is the 
schema of the array type field:

Field{name=market_transactionManagement_transactionManagers, description=, 
type=ARRAY>, options={{}}}

My sql query is selecting a nested field: SELECT 
`market_transactionManagement_transactionManagers.email` FROM PCOLLECTION

Error:

Caused by: 
org.apache.beam.vendor.calcite.v1_20_0.org.apache.calcite.sql.validate.SqlValidatorException:
 Column 'market_transactionManagement_transactionManagers.email' not found in 
any table

So what would be the right syntax? Thanks!

From: Andrew Pilloud mailto:apill...@google.com>>
Date: Tuesday, May 11, 2021 at 11:51 AM
To: Tao Li mailto:t...@zillow.com>>
Cc: "user@beam.apache.org" 
mailto:user@beam.apache.org>>, Yuan Feng 
mailto:yua...@zillowgroup.com>>
Subject: Re: A problem with calcite sql

SELECT CAST('CAST(id AS VARCHAR)' AS VARCHAR) FROM PCOLLECTION works for me, 
but I don't think that is what you wanted. Note that ' is for string literals 
and ` is for escaping names in Beam SQL's default dialect config.

Try:
SELECT `id` FROM PCOLLECTION

On Tue, May 11, 2021 at 10:58 AM Tao Li 
mailto:t...@zillow.com>> wrote:
@Andrew Pilloud thanks for your suggestions. I 
tried CAST and TRIM but it did not work:

Sql Stmt I am using: SELECT 'CAST(id AS VARCHAR)' FROM PCOLLECTION

Logs:

[main] INFO org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner - SQL:
SELECT 'CAST(id AS VARCHAR)'
FROM `beam`.`PCOLLECTION` AS `PCOLLECTION`
[main] INFO org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner - 
SQLPlan>
LogicalProject(EXPR$0=['CAST(id AS VARCHAR)'])
  BeamIOSourceRel(table=[[beam, PCOLLECTION]])

[main] INFO org.apache.beam.sdk.extensions.sql.impl.CalciteQueryPlanner - 
BEAMPlan>
BeamCalcRel(expr#0..44=[{inputs}], expr#45=['CAST(id AS VARCHAR)'], 
EXPR$0=[$t45])
  BeamIOSourceRel(table=[[beam, PCOLLECTION]])

Exception in thread "main" java.lang.RuntimeException: Unhandled logical type 
SqlCharType
at 
org.apache.beam.sdk.schemas.utils.AvroUtils.getFieldSchema(AvroUtils.java:911)
at 
org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroField(AvroUtils.java:306)
at 
org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroSchema(AvroUtils.java:341)
at 
org.apache.beam.sdk.schemas.utils.AvroUtils.toAvroSchema(AvroUtils.java:348)

From: Andrew Pilloud mailto:apill...@google.com>>
Reply-To: "user@beam.apache.org" 
mailto:user@beam.apache.org>>
Date: Monday, May 10, 2021 at 7:46 PM
To: user mailto:user@beam.apache.org>>
Cc: Yuan Feng mailto:yua...@zillowgroup.com>>
Subject: Re: A problem with calcite sql

For the first one you have 
https://issues.apache.org/jira/browse/BEAM-5251
For the second, I opened a new issue for you: 
https://issues.apache.org/jira/browse/BEAM-12323

Your second issue is because our 

Re: DirectRunner, Fusion, and Triggers

2021-05-12 Thread Kenneth Knowles
On Sat, May 8, 2021 at 12:00 AM Bashir Sadjad  wrote:

> Hi Beam-users,
>
> *TL;DR;* I wonder if DirectRunner does any fusion optimization
> 
> and whether this has any impact on triggers/panes?
>

> *Details* (the context for everything below is *DirectRunner* and this is
> a *batch* job):
> I have a batch pipeline that roughly looks like this: S1->S2->S3
>
> S1: Create URLs (from DB)
> S2: Fetch those URLs (output of S1) and create Avro records
> S3: Write those records to Parquet files
>
> S2 and S3 can be fused to generate Parquet files while the records are
> fetched/created. However, it does not seem to be the case, because there is
> no [temp] file while the resources are being fetched and the writer log
> messages appear only after all fetches are done.
>
> If I add a trigger to the output PCollection of S2 (i.e., `records`
> below), then I get intermediate Parquet output:
> ```
> records.apply(Window. into(new GlobalWindows())
>.triggering(Repeatedly.forever(
>AfterProcessingTime.pastFirstElementInPane().plusDelayOf(5
>.discardingFiredPanes());
> ```
>
> However, if I add a dummy S2' after S2 (i.e., S1->S2->S2'->S3) which only
> prints some log messages for each record and passes the record to output,
> then it seems S2 and S2' are fused. Because the log messages are
> interleaved with fetches.
>
> *Q1*: Does DirectRunner do any fusion optimization (e.g., like
> DataflowRunner)? If not by default, is there any way to enable it?
>

The Java DirectRunner does not do any fusion optimization. There's no code
to enable :-). It should affect performance only, not semantics. The
DirectRunner is known to have poor performance, but mostly no one is
working on speeding it up because it is really just for small-scale testing.



> The other issue is with triggers and creating panes. I have an extended
> version of this pipeline where a simplified view of it is:
> S1->S2A->GBK->S2B->S3
>
> S1: Like before
> S2A: Add a key to the output of S1
> GBK: Groups output of S2A to remove duplicate keys
> S2B: Similar to S2 above, i.e., fetch deduped URLs and create Avro records
> S3: Same as before
>
> *Q2*: In this case, if I add a dummy S2B' after S2', the log messages are
> *not* interleaved with resource fetches, i.e., no fusion is happening.
> Why? What is different here?
>

I don't quite understand what the problem is here.



> *Q3*: Even if I add a similar trigger to the output of S2B, the Parquet
> file generation does not start until all of the fetches are done. Again,
> what is different here and why intermediate panes are not fired while the
> output of S2B is being generated?
>

I think it would help to see how you have configured the ParquetIO write
transform.

Kenn


>
> Thanks
>
> -B
> P.S. I need this pipeline to work both on a distributed runner and also on
> a local machine with many cores. That's why the performance of DirectRunner
> is important to me.
>


Re: Extremely Slow DirectRunner

2021-05-12 Thread Evan Galpin
On Mon, May 10, 2021 at 2:09 PM Boyuan Zhang  wrote:

> Hi Evan,
>
> What do you mean startup delay? Is it the time that from you start the
> pipeline to the time that you notice the first output record from PubSub?
>

Yes that's what I meant, the seemingly idle system waiting for pubsub
output despite data being in the subscription at pipeline start time.

On Sat, May 8, 2021 at 12:50 AM Ismaël Mejía  wrote:
>
>> Can you try running direct runner with the option
>> `--experiments=use_deprecated_read`
>>
>
This seems to work for me, thanks for this! 


>> Seems like an instance of
>> https://issues.apache.org/jira/browse/BEAM-10670?focusedCommentId=17316858=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel#comment-17316858
>> also reported in
>> https://lists.apache.org/thread.html/re6b0941a8b4951293a0327ce9b25e607cafd6e45b69783f65290edee%40%3Cdev.beam.apache.org%3E
>>
>> We should rollback using the SDF wrapper by default because of the
>> usability and performance issues reported.
>>
>>
>> On Sat, May 8, 2021 at 12:57 AM Evan Galpin 
>> wrote:
>>
>>> Hi all,
>>>
>>> I’m experiencing very slow performance and startup delay when testing a
>>> pipeline locally. I’m reading data from a Google PubSub subscription as the
>>> data source, and before each pipeline execution I ensure that data is
>>> present in the subscription (readable from GCP console).
>>>
>>> I’m seeing startup delay on the order of minutes with DirectRunner (5-10
>>> min). Is that expected? I did find a Jira ticket[1] that at first seemed
>>> related, but I think it has more to do with BQ than DirectRunner.
>>>
>>> I’ve run the pipeline with a debugger connected and confirmed that it’s
>>> minutes before the first DoFn in my pipeline receives any data. Is there a
>>> way I can profile the direct runner to see what it’s churning on?
>>>
>>> Thanks,
>>> Evan
>>>
>>> [1]
>>> https://issues.apache.org/jira/plugins/servlet/mobile#issue/BEAM-4548
>>>
>>


Is there a way (seetings) to limit the number of element per worker machine

2021-05-12 Thread Eila Oriel Research
Hi,
I am running out of resources on the workers machines.
The reasons are:
1. Every pcollection is a reference to a LARGE file that is copied into the
worker
2. The worker makes calculations on the copied file using a software
library that consumes memory / storage / compute resources

I have changed the workers' CPUs and memory size. At some point, I am
running out of resources with this method as well
I am looking to limit the number of pCollection / elements that are being
processed in parallel on each worker at a time.

Many thank for any advice,
Best wishes,
-- 
Eila

Meetup