Re: Python SDF for unbound source

2022-03-11 Thread Sam Bourne
I did a few experiments and am still a little confused about what’s
happening using either of the DirectRunner multiprocessing modes.

I’m running the following code:

import argparseimport sys
import apache_beam as beamfrom apache_beam.options.pipeline_options
import PipelineOptionsfrom apache_beam.transforms.core import
RestrictionProvider
class InfRestrictionProvider(RestrictionProvider):
"""
An infinite restriction provider
"""

def initial_restriction(self, element):
from apache_beam.io.restriction_trackers import OffsetRange
return OffsetRange(0, float('inf'))

def create_tracker(self, restriction):
from apache_beam.io.restriction_trackers import OffsetRestrictionTracker
return OffsetRestrictionTracker(restriction)

def restriction_size(self, element, restriction):
return 1
class _EventReadSDF(beam.DoFn):
"""
An SDF for subscribing to custom events.
"""

restriction_tracker = beam.DoFn.RestrictionParam(InfRestrictionProvider())

def __init__(self):
super(_FtrackEventReadSDF, self).__init__()
# number of events received
self._counter = 0

@beam.DoFn.unbounded_per_element()
def process(self, _, restriction_tracker=restriction_tracker):
import random
import time

print('Attempting to claim {!r}'.format(self._counter))
if not restriction_tracker.try_claim(self._counter):
print('Failed to claim {!r}'.format(self._counter))
return

try:
n = random.randrange(0, 5)
time.sleep(n)
yield n
finally:
self._counter += 1
restriction_tracker.defer_remainder()
def main(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parser_known_args(argv)
pipe = beam.Pipeline(options=PipelineOptions(pipeline_args))

(
pipe
| beam.Impulse()
| beam.ParDo(_EventReadSDF())
| beam.Map(print)
)

result = pipe.run()
result.wait_until_finish()
if __name__ == '__main__':
main(sys.argv)

If I run this pipeline with more than one worker (e.g.
--direct_num_workers=2), then the SDF only produces one element. This seems
to be the case using either the direct_running_mode of multi_processing or
multi_threading. I tested this in python2 running beam-2.24.0 and python3
running beam-2.29.0.

Interestingly, if I run this with --direct_num_workers=1 it will generate
elements forever as intended. It would be great to understand what’s going
on a bit further. Is this the right technique for the RestrictionProvider?
Is there any other sample code that anyone has for doing something similar?

Thanks,
-Sam

On Wed, Mar 9, 2022 at 8:22 PM Ankur Goenka  wrote:

> This might be an issue with multi_processing implementation of
> DirectRunner.
>
> I experimented with the following code and got following error. You can
> try using --direct_running_mode='multi_threading'
>
> --
>  File
> "/usr/local/google/home/goenka/d/work/tmp/t1/venv/lib/python3.9/site-packages/apache_beam/runners/worker/bundle_processor.py",
> line 1425, in process
> initial_restriction = self.restriction_provider.initial_restriction(
>   File "/usr/local/google/home/goenka/d/work/tmp/t1/test.py", line 15, in
> initial_restriction
> return OffsetRange(0, float('inf'))
> NameError: name 'OffsetRange' is not defined
>
>
> --
>
>
> import apache_beam as beam
> import sys
> import argparse
> from apache_beam.io.restriction_trackers import OffsetRange,
> OffsetRestrictionTracker
> from apache_beam.options.pipeline_options import PipelineOptions
> from apache_beam.transforms.core import RestrictionProvider
>
>
> class InfRestrictionProvider(RestrictionProvider):
> """
> An infinite restriction provider
> """
>
> def initial_restriction(self, element):
> return OffsetRange(0, float('inf'))
>
> def create_tracker(self, restriction):
> return OffsetRestrictionTracker(restriction)
>
> def restriction_size(self, element, restriction):
> return 1
>
> @beam.typehints.with_output_types(int)
> class _EventReadSDF(beam.DoFn):
> """
> An SDF for subscribing to custom events.
> """
>
> restriction_tracker =
> beam.DoFn.RestrictionParam(InfRestrictionProvider())
>
> def __init__(self):
> # type: () -> None
> self._counter = 0
>
> def process(self, _, restriction_tracker=restriction_tracker):
> # type: (Any, beam.DoFn.RestrictionParam) -> Iterator[Event]
>
> if not restriction_tracker.try_claim(self._counter):
> return
>
> i = 0
> for _ in range(100):
>   try:
>   # Blocks until the next event is received.
>   i+=1
>   yield i
>   finally:
>   self._counter += 1
>   restriction_tracker.defer_remainder()
>
>
> def run(a

Re: Re: Why is portable python runner trying to stage a flink job server jar?

2022-03-11 Thread Deepak Nagaraj
Hi Cham,

Thanks. I tried adding my jar to the classpath in the Java SDK
container, but it made no difference.

I then saw that the loader program for the Java SDK harness hard-codes
a few jars to be in the classpath [1].

It also tacks on any jars specified to be staged, but in any case, it
does not seem to include every jar in /opt/apache/beam/jars directory.

Regards,
Deepak

[1] 
https://github.com/apache/beam/blob/c1db5fe0afabc6639ea09ca36ef13ebb63b283de/sdks/java/container/boot.go#L135

On Tue, Mar 8, 2022 at 11:07 AM Chamikara Jayalath  wrote:
>
>
>
> On Fri, Mar 4, 2022 at 3:16 PM Deepak Nagaraj  wrote:
>>
>> Hi Cham,
>>
>> Thanks, we set filesToStage to be empty within expansion service 
>> configuration.
>>
>> Now when we run a Python pipeline that reads from Kafka, we see an exception 
>> about a missing class. [1]
>>
>> This class is present in the jar that was getting staged earlier, so in a 
>> way the failure is expected.
>>
>> However, the same jar is also present within the classpath of the 
>> taskmanager [2], yet it does not seem to be considered by the function 
>> harness.
>
>
> Hi Deepak,
>
> In general, when  "filesToStage" is excluded, everything in the CLASSPATH 
> should be staged for the SDK Harness. I'm not too familiar with the Flink 
> dependency setup but is it possible that the relevant jar does not get staged 
> for the SDK harness somehow ?
>
> Thanks,
> Cham
>
>>
>>
>> Is there a way we can let the function harness consider the available jar?
>>
>> Also, is our understanding correct that the harness only looks at jars 
>> previously uploaded to the staging service?
>>
>> Thanks,
>> Deepak
>>
>> [1] Exception about missing class:
>>
>> 2022-03-03 17:37:29,531 ERROR
>> org.apache.beam.fn.harness.control.BeamFnControlClient   [] -
>> Error thrown when handling InstructionRequest 1
>> java.lang.NoClassDefFoundError:
>> org/springframework/expression/EvaluationContext
>>at 
>> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.toHeaders(KafkaRecordCoder.java:86)
>>at 
>> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.decode(KafkaRecordCoder.java:81)
>>at 
>> org.apache.beam.sdk.io.kafka.KafkaRecordCoder.decode(KafkaRecordCoder.java:40)
>>at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>>at 
>> org.apache.beam.sdk.coders.LengthPrefixCoder.decode(LengthPrefixCoder.java:64)
>>at org.apache.beam.sdk.coders.Coder.decode(Coder.java:159)
>>at 
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:621)
>>at 
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:612)
>>at 
>> org.apache.beam.sdk.util.WindowedValue$FullWindowedValueCoder.decode(WindowedValue.java:558)
>>at 
>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.awaitCompletion(BeamFnDataInboundObserver2.java:126)
>>at 
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:467)
>>at 
>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>>at 
>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>>at 
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>at 
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>at java.base/java.lang.Thread.run(Thread.java:829)
>> Caused by: java.lang.ClassNotFoundException:
>> org.springframework.expression.EvaluationContext
>>at 
>> java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:581)
>>at 
>> java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
>>at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
>>... 16 more
>>
>> [2] Classpath of task manager (the missing class is present in 
>> beam-runners-flink-job-server-no-slf4j.jar, which apparently is not 
>> considered by the harness):
>>
>> 2022-03-03 07:18:42,146 INFO
>> org.apache.flink.runtime.taskexecutor.TaskManagerRunner  [] -
>> Classpath: /opt/flink/lib/beam-runners-flink-job-server-no-slf4j.jar:/op
>> t/flink/lib/flink-csv-1.13.5.jar:/opt/flink/lib/flink-json-1.13.5.jar:/opt/flink/lib/flink-shaded-zookeeper-3.4.14.jar:/opt/flink/lib/flink-table-blink_2.12-1.13.5.jar:/
>> opt/flink/lib/flink-table_2.12-1.13.5.jar:/opt/flink/lib/log4j-1.2-api-2.16.0.jar:/opt/flink/lib/log4j-api-2.16.0.jar:/opt/flink/lib/log4j-core-2.16.0.jar:/opt/flink/lib
>> /log4j-slf4j-impl-2.16.0.jar:/opt/flink/lib/flink-dist_2.12-1.13.5.jar:::
>>
>> On 2022/02/11 22:22:16 Chamikara Jayalath wrote:
>> > Hi Jeremy,
>> >
>> > By default we stage all jars in the CLASSPATH of expansion service. You can
>> > override this by setting the filesToStage option when starting up the
>>