Re: Beam portable runner setup for Flink + Python on Kubernetes

2024-02-23 Thread Sam Bourne
Hey Jaehyeon,

Docker is the default environment type
<https://github.com/apache/beam/blob/ae8bbf86c9c5951b2685b8400d6ae3fefe678a9a/sdks/python/apache_beam/options/pipeline_options.py#L1481>
when using the PortableRunner. I included them just for reference because
we found it useful to override the default sdk container with our own.

It is pretty complicated, especially to debug sometimes, but we had some
good success running some simple pipelines in production for around a year.
I was more wary about maintaining my own Flink cluster so eventually we
decided to shed the technical debt and pay for Dataflow. Runners already
rely on docker to support the portability framework
<https://beam.apache.org/roadmap/portability/> so I don't think that is
much of a concern.

On Thu, Feb 22, 2024 at 7:49 PM Jaehyeon Kim  wrote:

> Hi Sam
>
> Thanks for the GitHub repo link. In your example, the environment type is
> set to DOCKER and it requires a docker container running together with the
> task manager. Would you think it is acceptable in a production environment?
>
> Cheers,
> Jaehyeon
>
> On Fri, 23 Feb 2024 at 13:57, Sam Bourne  wrote:
>
>> I made this a few years ago to help people like yourself.
>>
>> https://github.com/sambvfx/beam-flink-k8s
>>
>> Hopefully it's insightful and I'm happy to accept any MRs to update any
>> outdated information or to flesh it out more.
>>
>> On Thu, Feb 22, 2024 at 3:48 PM Jaehyeon Kim  wrote:
>>
>>> Hello,
>>>
>>> I'm playing with the beam portable runner to read/write data from Kafka.
>>> I see a spark runner example on Kubernetes (
>>> https://beam.apache.org/documentation/runners/spark/#kubernetes) but
>>> the flink runner section doesn't include such an example.
>>>
>>> Is there a resource that I can learn? Ideally it'll be good if it is
>>> updated in the documentation.
>>>
>>> Cheers,
>>> Jaehyeon
>>>
>>


Re: Beam portable runner setup for Flink + Python on Kubernetes

2024-02-22 Thread Sam Bourne
I made this a few years ago to help people like yourself.

https://github.com/sambvfx/beam-flink-k8s

Hopefully it's insightful and I'm happy to accept any MRs to update any
outdated information or to flesh it out more.

On Thu, Feb 22, 2024 at 3:48 PM Jaehyeon Kim  wrote:

> Hello,
>
> I'm playing with the beam portable runner to read/write data from Kafka. I
> see a spark runner example on Kubernetes (
> https://beam.apache.org/documentation/runners/spark/#kubernetes) but the
> flink runner section doesn't include such an example.
>
> Is there a resource that I can learn? Ideally it'll be good if it is
> updated in the documentation.
>
> Cheers,
> Jaehyeon
>


Re: Seeking Assistance to Resolve Issues/bug with Flink Runner on Kubernetes

2023-08-14 Thread Sam Bourne
Hey Kapil,

I grappled with a similar deployment and created this repo
 [1] to attempt to provide
others with some nuggets of useful information. We were running cross
language pipelines on flink connecting PubsubIO

[2]
to other misc python transforms. No promises it will help, but feel free to
take a look as it's a close approximation to the setup that we had working.

Your particular error seems related to the Kafka transform. Does a pure
python pipeline execute as expected?

[1] https://github.com/sambvfx/beam-flink-k8s
[2]
https://github.com/apache/beam/blob/v2.48.0/sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/pubsub/PubsubIO.java#L171


On Mon, Aug 14, 2023 at 11:05 AM Daniel Chen via user 
wrote:

> Not the OP, but is it possible to join the slack channel without an
> apache.org email address? I tried joining slack previously for support
> and gave up because it looked like it wasn't.
>
> On Mon, Aug 14, 2023 at 10:58 AM Kenneth Knowles  wrote:
>
>> There is a slack channel linked from
>> https://beam.apache.org/community/contact-us/ it is #beam on
>> the-asf.slack.com
>>
>> (you find this via beam.apache.org -> Community -> Contact Us)
>>
>> It sounds like an issue with running a multi-language pipeline on the
>> portable flink runner. (something which I am not equipped to help with in
>> detail)
>>
>> Kenn
>>
>> On Wed, Aug 9, 2023 at 2:51 PM kapil singh 
>> wrote:
>>
>>> Hey,
>>>
>>> I've been grappling with this issue for the past five days and, despite
>>> my continuous efforts, I haven't found a resolution. Additionally, I've
>>> been unable to locate a Slack channel for Beam where I might seek
>>> assistance.
>>>
>>> issue
>>>
>>> *RuntimeError: Pipeline construction environment and pipeline runtime
>>> environment are not compatible. If you use a custom container image, check
>>> that the Python interpreter minor version and the Apache Beam version in
>>> your image match the versions used at pipeline construction time.
>>> Submission environment: beam:version:sdk_base:apache/beam_java8_sdk:2.48.0.
>>> Runtime environment:
>>> beam:version:sdk_base:apache/beam_python3.8_sdk:2.48.0.*
>>>
>>>
>>> Here what i am trying to do
>>>
>>>  i am running job from kubernetes container  that hits on job server and
>>> then job manager and task manager
>>> task manager and job manager is one Container
>>>
>>> Here is  My custom Dockerfile. name:custom-flink
>>>
>>> # Starting with the base Flink image
>>> FROM apache/flink:1.16-java11
>>> ARG FLINK_VERSION=1.16
>>> ARG KAFKA_VERSION=2.8.0
>>>
>>> # Install python3.8 and its associated dependencies, followed by pyflink
>>> RUN set -ex; \
>>> apt-get update && \
>>> apt-get install -y build-essential libssl-dev zlib1g-dev libbz2-dev
>>> libffi-dev lzma liblzma-dev && \
>>> wget https://www.python.org/ftp/python/3.8.0/Python-3.8.0.tgz && \
>>> tar -xvf Python-3.8.0.tgz && \
>>> cd Python-3.8.0 && \
>>> ./configure --without-tests --enable-shared && \
>>> make -j4 && \
>>> make install && \
>>> ldconfig /usr/local/lib && \
>>> cd .. && rm -f Python-3.8.0.tgz && rm -rf Python-3.8.0 && \
>>> ln -s /usr/local/bin/python3.8 /usr/local/bin/python && \
>>> ln -s /usr/local/bin/pip3.8 /usr/local/bin/pip && \
>>> apt-get clean && \
>>> rm -rf /var/lib/apt/lists/* && \
>>> python -m pip install --upgrade pip; \
>>> pip install apache-flink==${FLINK_VERSION}; \
>>> pip install kafka-python
>>>
>>> RUN pip install --no-cache-dir apache-beam[gcp]==2.48.0
>>>
>>> # Copy files from official SDK image, including script/dependencies.
>>> COPY --from=apache/beam_python3.8_sdk:2.48.0 /opt/apache/beam/
>>> /opt/apache/beam/
>>>
>>> # java SDK
>>> COPY --from=apache/beam_java11_sdk:2.48.0 /opt/apache/beam/
>>> /opt/apache/beam_java/
>>>
>>> RUN apt-get update && apt-get install -y python3-venv && rm -rf
>>> /var/lib/apt/lists/*
>>>
>>> # Give permissions to the /opt/apache/beam-venv directory
>>> RUN mkdir -p /opt/apache/beam-venv && chown -R :
>>> /opt/apache/beam-venv
>>>
>>> Here is my Deployment file for Job manager,Task manager plus worker-pool
>>> and job server
>>>
>>>
>>> apiVersion: v1
>>> kind: Service
>>> metadata:
>>> name: flink-jobmanager
>>> namespace: flink
>>> spec:
>>> type: ClusterIP
>>> ports:
>>> - name: rpc
>>> port: 6123
>>> - name: blob-server
>>> port: 6124
>>> - name: webui
>>> port: 8081
>>> selector:
>>> app: flink
>>> component: jobmanager
>>> ---
>>> apiVersion: v1
>>> kind: Service
>>> metadata:
>>> name: beam-worker-pool
>>> namespace: flink
>>> spec:
>>> selector:
>>> app: flink
>>> component: taskmanager
>>> ports:
>>> - protocol: TCP
>>> port: 5
>>> targetPort: 5
>>> name: pool
>>> ---
>>> apiVersion: apps/v1
>>> kind: Deployment
>>> metadata:
>>> name: flink-jobmanager
>>> namespace: flink
>>> spec:
>>> 

Re: Best patterns for a polling transform

2023-06-22 Thread Sam Bourne
The streaming support in Python direct runner is currently rather limited

In this experiment I was running a batch pipeline instead of a streaming
one. Are there any known issues using timers with a batch pipeline?

It sounds like we should identify whether this is a problem in the SDK or
in the DirectRunner implementation

I’m trying to understand how things are intended to work to know if I’ve
encountered a bug. For example, when setting a timer, is it reasonable to
expect that timer to fire even if all elements in the pipeline finish
processing? I can see the argument either way depending on the use case.
For example batching RPC vs garbage collection. Could the timer API benefit
from being able to specify whether or not the timer is guaranteed to run?

This polling problem doesn’t seem very unique so I’m surprised this hasn’t
come up before. I’m interested if there are better techniques for doing
this so if anyone has any ideas I’m open to them!

On Thu, Jun 22, 2023 at 7:32 AM Valentyn Tymofieiev via dev <
d...@beam.apache.org> wrote:

> > The below code runs fine with a single worker but with multiple workers
> there are duplicate values.
> > I’m using TimeDomain.WATERMARK here due to it simply not working when
> using REAL_TIME. The docs seem to suggest REAL_TIME would be the way to do
> this, however there seems to be no guarantee that a REAL_TIME callback will
> run.
>
> It seems that you are using Python direct runner for experimentation. The
> streaming support in Python direct runner is currently rather limited:
> https://github.com/apache/beam/issues/21987 , it is possible that direct
> runner doesn't correctly implement the streaming semantics. It sounds like
> we should identify whether this is a problem in the SDK or in the
> DirectRunner implementation, and file issues accordingly. Streaming direct
> runner issues use this umbrella issue:
> https://github.com/apache/beam/issues/21987. I would also experiment with
> FlinkRunner or DataflowRunner. Also the streaming semantics behavior should
> be consistent across SDK, so different behavior between Python and  Java
> SDK would implicate an SDK bug.
>
>
> On Thu, Jun 22, 2023 at 10:00 AM Chad Dombrova  wrote:
>
>> I’m also interested in the answer to this.  This is essential for reading
>> from many types of data sources.
>>
>>
>> On Tue, Jun 20, 2023 at 2:57 PM Sam Bourne  wrote:
>>
>>> +dev to see if anyone has any suggestions.
>>>
>>> On Fri, Jun 16, 2023 at 5:46 PM Sam Bourne  wrote:
>>>
>>>> Hello beam community!
>>>>
>>>> I’m having trouble coming up with the best pattern to *eagerly* poll.
>>>> By eagerly, I mean that elements should be consumed and yielded as soon as
>>>> possible. There are a handful of experiments that I’ve tried and my latest
>>>> attempt using the timer API seems quite promising, but is operating in a
>>>> way that I find rather unintuitive. My solution was to create a sort of
>>>> recursive timer callback - which I found one example
>>>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/transforms/userstate_test.py#L797>
>>>> of within the beam test code.
>>>>
>>>> I have a few questions:
>>>>
>>>> 1) The below code runs fine with a single worker but with multiple
>>>> workers there are duplicate values. It seems that the callback and snapshot
>>>> of the state is provided to multiple workers and the number of duplications
>>>> increases with the number of workers. Is this due to the values being
>>>> provided to timer.set?
>>>>
>>>> 2) I’m using TimeDomain.WATERMARK here due to it simply not working
>>>> when using REAL_TIME. The docs
>>>> <https://beam.apache.org/documentation/programming-guide/#state-and-timers>
>>>> seem to suggest REAL_TIME would be the way to do this, however there
>>>> seems to be no guarantee that a REAL_TIME callback will run. In this
>>>> sample setting the timer to REAL_TIME will simply not ever fire the
>>>> callback. Interestingly, if you call timer.set with any value less
>>>> than the current time.time(), then the callback will run, however it
>>>> seems to fire immediately regardless of the value (and in this sample will
>>>> actually raise an AssertionError
>>>> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L943>
>>>> ).
>>>>
>>>> I’m happy for suggestions!
>>>> -Sam
>>>>
>>>> import randomimport threading

Re: Best patterns for a polling transform

2023-06-20 Thread Sam Bourne
+dev to see if anyone has any suggestions.

On Fri, Jun 16, 2023 at 5:46 PM Sam Bourne  wrote:

> Hello beam community!
>
> I’m having trouble coming up with the best pattern to *eagerly* poll. By
> eagerly, I mean that elements should be consumed and yielded as soon as
> possible. There are a handful of experiments that I’ve tried and my latest
> attempt using the timer API seems quite promising, but is operating in a
> way that I find rather unintuitive. My solution was to create a sort of
> recursive timer callback - which I found one example
> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/transforms/userstate_test.py#L797>
> of within the beam test code.
>
> I have a few questions:
>
> 1) The below code runs fine with a single worker but with multiple workers
> there are duplicate values. It seems that the callback and snapshot of the
> state is provided to multiple workers and the number of duplications
> increases with the number of workers. Is this due to the values being
> provided to timer.set?
>
> 2) I’m using TimeDomain.WATERMARK here due to it simply not working when
> using REAL_TIME. The docs
> <https://beam.apache.org/documentation/programming-guide/#state-and-timers>
> seem to suggest REAL_TIME would be the way to do this, however there
> seems to be no guarantee that a REAL_TIME callback will run. In this
> sample setting the timer to REAL_TIME will simply not ever fire the
> callback. Interestingly, if you call timer.set with any value less than
> the current time.time(), then the callback will run, however it seems to
> fire immediately regardless of the value (and in this sample will actually
> raise an AssertionError
> <https://github.com/apache/beam/blob/v2.48.0/sdks/python/apache_beam/runners/direct/transform_evaluator.py#L943>
> ).
>
> I’m happy for suggestions!
> -Sam
>
> import randomimport threading
> import apache_beam as beamimport apache_beam.coders as codersimport 
> apache_beam.transforms.combiners as combinersimport 
> apache_beam.transforms.userstate as userstateimport 
> apache_beam.utils.timestamp as timestampfrom 
> apache_beam.options.pipeline_options import PipelineOptions
> class Log(beam.PTransform):
>
> lock = threading.Lock()
>
> @classmethod
> def _log(cls, element, label):
> with cls.lock:
> # This just colors the print in terminal
> print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element))
> return element
>
> def expand(self, pcoll):
> return pcoll | beam.Map(self._log, self.label)
> class EagerProcess(beam.DoFn):
>
> BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder())
> POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK)
>
> def process(
> self,
> element,
> buffer=beam.DoFn.StateParam(BUFFER_STATE),
> timer=beam.DoFn.TimerParam(POLL_TIMER),
> ):
> _, item = element
>
> for i in range(item):
> buffer.add(i)
>
> timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))
>
> @userstate.on_timer(POLL_TIMER)
> def flush(
> self,
> buffer=beam.DoFn.StateParam(BUFFER_STATE),
> timer=beam.DoFn.TimerParam(POLL_TIMER),
> ):
> cache = buffer.read()
> buffer.clear()
>
> requeue = False
> for item in cache:
> if random.random() < 0.1:
> yield item
> else:
> buffer.add(item)
> requeue = True
>
> if requeue:
> timer.set(timestamp.Timestamp.now() + 
> timestamp.Duration(seconds=10))
> def main():
> options = PipelineOptions.from_dictionary({
> 'direct_num_workers': 3,
> 'direct_running_mode': 'multi_threading',
> })
>
> pipe = beam.Pipeline(options=options)
> (
> pipe
> | beam.Create([10])
> | 'Init' >> Log()
> | beam.Reify.Timestamp()
> | 'PairWithKey' >> beam.Map(lambda x: (hash(x), x))
> | beam.ParDo(EagerProcess())
> | 'Complete' >> Log()
> | beam.transforms.combiners.Count.Globally()
> | 'Count' >> Log()
> )
> result = pipe.run()
> result.wait_until_finish()
> if __name__ == '__main__':
> main()
>
>


Best patterns for a polling transform

2023-06-16 Thread Sam Bourne
Hello beam community!

I’m having trouble coming up with the best pattern to *eagerly* poll. By
eagerly, I mean that elements should be consumed and yielded as soon as
possible. There are a handful of experiments that I’ve tried and my latest
attempt using the timer API seems quite promising, but is operating in a
way that I find rather unintuitive. My solution was to create a sort of
recursive timer callback - which I found one example

of within the beam test code.

I have a few questions:

1) The below code runs fine with a single worker but with multiple workers
there are duplicate values. It seems that the callback and snapshot of the
state is provided to multiple workers and the number of duplications
increases with the number of workers. Is this due to the values being
provided to timer.set?

2) I’m using TimeDomain.WATERMARK here due to it simply not working when
using REAL_TIME. The docs

seem to suggest REAL_TIME would be the way to do this, however there seems
to be no guarantee that a REAL_TIME callback will run. In this sample
setting the timer to REAL_TIME will simply not ever fire the callback.
Interestingly, if you call timer.set with any value less than the current
time.time(), then the callback will run, however it seems to fire
immediately regardless of the value (and in this sample will actually raise
an AssertionError

).

I’m happy for suggestions!
-Sam

import randomimport threading
import apache_beam as beamimport apache_beam.coders as codersimport
apache_beam.transforms.combiners as combinersimport
apache_beam.transforms.userstate as userstateimport
apache_beam.utils.timestamp as timestampfrom
apache_beam.options.pipeline_options import PipelineOptions
class Log(beam.PTransform):

lock = threading.Lock()

@classmethod
def _log(cls, element, label):
with cls.lock:
# This just colors the print in terminal
print('\033[1m\033[92m{}\033[0m : {!r}'.format(label, element))
return element

def expand(self, pcoll):
return pcoll | beam.Map(self._log, self.label)
class EagerProcess(beam.DoFn):

BUFFER_STATE = userstate.BagStateSpec('buffer', coders.PickleCoder())
POLL_TIMER = userstate.TimerSpec('timer', beam.TimeDomain.WATERMARK)

def process(
self,
element,
buffer=beam.DoFn.StateParam(BUFFER_STATE),
timer=beam.DoFn.TimerParam(POLL_TIMER),
):
_, item = element

for i in range(item):
buffer.add(i)

timer.set(timestamp.Timestamp.now() + timestamp.Duration(seconds=10))

@userstate.on_timer(POLL_TIMER)
def flush(
self,
buffer=beam.DoFn.StateParam(BUFFER_STATE),
timer=beam.DoFn.TimerParam(POLL_TIMER),
):
cache = buffer.read()
buffer.clear()

requeue = False
for item in cache:
if random.random() < 0.1:
yield item
else:
buffer.add(item)
requeue = True

if requeue:
timer.set(timestamp.Timestamp.now() +
timestamp.Duration(seconds=10))
def main():
options = PipelineOptions.from_dictionary({
'direct_num_workers': 3,
'direct_running_mode': 'multi_threading',
})

pipe = beam.Pipeline(options=options)
(
pipe
| beam.Create([10])
| 'Init' >> Log()
| beam.Reify.Timestamp()
| 'PairWithKey' >> beam.Map(lambda x: (hash(x), x))
| beam.ParDo(EagerProcess())
| 'Complete' >> Log()
| beam.transforms.combiners.Count.Globally()
| 'Count' >> Log()
)
result = pipe.run()
result.wait_until_finish()
if __name__ == '__main__':
main()


Re: How to run Beam pipeline in Flink [Python]?

2022-06-20 Thread Sam Bourne
Hi Mike,

I’m not an expert, but I have some experience running beam pipelines in
Flink that require access to something on disk. When the Flink taskmanager
executes the WriteToText transform, it spins up a beam python SDK docker
container to perform the work*. At the moment there is not a way to mount a
directory into the SDK docker container, but there is an open ticket
 [1] to provide a way to
specify a directory to mount.

I was running a fork for a little while with these changes
 [2] that allowed us to pass
along some docker run options for how the SDK container was started (so we
could include a mount). One thing to note is that the flink taskmanager
workers need access to whatever directory you’re specifying (the E:\ drive
in your example). I also created a quick sample of how to deploy Flink in
Kubernetes here  [3] which
solved some problems we were running into dealing with the Flink job server
sharing the same disk staging area.

Hopefully some of that helps,
-Sam

*There’s an exception to this where some transforms are replaced with
something runner-specific. For example, the
apache_beam.io.gcp.pubsub.ReadFromPubSub transform. This gets “swapped out”
to and executes the Java implementation of the transform directly on the
Flink taskmanager worker and not within the SDK container.

[1] https://github.com/apache/beam/issues/19240
[2] https://github.com/apache/beam/pull/8982
[3] https://github.com/sambvfx/beam-flink-k8s

On Sat, Jun 18, 2022 at 11:39 AM  wrote:

>
> I try again maybe someone can help me with this?
>
> How to run Beam on Flink?
>
> I have code:
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
>
> *def run():  import apache_beam as beam  from
> apache_beam.options.pipeline_options import PipelineOptionsoptions =
> PipelineOptions([ "--runner=FlinkRunner",
> "--flink_version=1.14", "--flink_master=localhost:8081",
> "--environment_config=localhost:5"  ])  output_file =
> 'E:\\directory\\output.txt'  with beam.Pipeline(options=options) as p:
> (p | 'Create file lines' >> beam.Create([   'Each element
> must be a string.',   'It writes one element per line.',
> 'There are no guarantees on the line order.',   'The data might be
> written into multiple files.', ]) | 'Write to files' >>
> beam.io.WriteToText(output_file) ) if __name__ == "__main__": run()*
>
> Should work. But for some reason Flink is not able to save to file:
>
> *CHAIN MapPartition (MapPartition at [2]Write to
> files/Write/WriteImpl/DoOnce/{FlatMap(),
> Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0]) FAILED*
>
> Same problem if I want to open some file.
> What is wrong here? I tried several example scripts - none is working.
> If you could help me to take first step in Beam and Flink.
> Regards
> Mike
>


Re: Python SDF for unbound source

2022-03-11 Thread Sam Bourne
._counter):
> return
>
> i = 0
> for _ in range(100):
>   try:
>   # Blocks until the next event is received.
>   i+=1
>   yield i
>   finally:
>   self._counter += 1
>   restriction_tracker.defer_remainder()
>
>
> def run(argv=None):
>   parser = argparse.ArgumentParser()
>   known_args, pipeline_args = parser.parse_known_args(argv)
>
>   pipeline_options = PipelineOptions(pipeline_args)
>
>   with beam.Pipeline(options=pipeline_options) as pipe:
> (
>   pipe
>   | beam.Impulse()
>   | beam.ParDo(_EventReadSDF())
>   | beam.Map(print)
> )
>
> if __name__ == '__main__':
>   run(sys.argv)
> ~
>
>
>
>
> ~
>
>
>
>
> ~
>
>
>
>
> ~
>
>
>
>
> "test.py" 67L, 1778B written
>
>
>
>  42,26 All
>
> On Wed, Mar 9, 2022 at 4:15 PM Sam Bourne  wrote:
>
>> Thanks Ankur for the feedback.
>>
>> Does anyone have any examples they could share writing a SDF for an
>> unbound source (ideally in python)?
>>
>> The SDF I wrote looks something like this. It works fine using the
>> DirectRunner with the PipelineOption --direct_runner_mode='in_memory',
>> but doesn’t seem to operate properly using
>> --direct_runner_mode='multi_processing'. I’d like to run this in the
>> DataflowRunner, but I wanted to make sure the SDF was operating
>> correctly first.
>>
>> Any tips would be greatly appreciated!
>>
>> import apache_beam as beamfrom apache_beam.io.restriction_trackers import 
>> OffsetRange, OffsetRestrictionTrackerfrom apache_beam.transforms.core import 
>> RestrictionProvider
>> from myeventlibrary import Event, Subscriber
>> class InfRestrictionProvider(RestrictionProvider):
>> """
>> An infinite restriction provider
>> """
>>
>> def initial_restriction(self, element):
>> return OffsetRange(0, float('inf'))
>>
>> def create_tracker(self, restriction):
>> return OffsetRestrictionTracker(restriction)
>>
>> def restriction_size(self, element, restriction):
>> return 1
>> @beam.typehints.with_output_types(Event)class _EventReadSDF(beam.DoFn):
>> """
>> An SDF for subscribing to custom events.
>> """
>>
>> restriction_tracker = 
>> beam.DoFn.RestrictionParam(InfRestrictionProvider())
>>
>> def __init__(self):
>> # type: () -> None
>> super(_FtrackEventReadSDF, self).__init__()
>> # number of events received
>> self._counter = 0
>>
>> def process(self, _, restriction_tracker=restriction_tracker):
>> # type: (Any, beam.DoFn.RestrictionParam) -> Iterator[Event]
>>
>> if not restriction_tracker.try_claim(self._counter):
>> return
>>
>> subscriber = Subscriber()
>> try:
>>     # Blocks until the next event is received.
>> yield subscriber.get()
>> finally:
>> self._counter += 1
>> restriction_tracker.defer_remainder()
>>
>>
>> On Wed, Mar 9, 2022 at 11:47 AM Ankur Goenka  wrote:
>>
>>> Hi Sam,
>>>
>>> SDF can reject split requests so a SDF can be made to run a single
>>> instance.
>>> DoFn.unbounded_per_element Let the Beam model know that this is an
>>> unbounded source. It also tries to infer it
>>> https://github.com/apache/beam/blob/a126adbc6aa73f1e30adfa65a3710f7f69a7ba89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L695
>>>
>>> As this will eventually run on a cluster, I would recommend going via
>>> the SDF route so that Watermarks and checkpoints can happen appropriately
>>> not just for this tranform but down stream tranforms.
>>>
>>> On Wed, Mar 9, 2022 at 10:54 AM Sam Bourne  wrote:
>>>
>>>> Hello! I’m looking for some help writing a custom transform that reads
>>>> from an unbound source. A basic version of this would look something like
>>>> this:
>>>>
>>>> import apache_beam as beam
>>>> import myeventlibrary
>>>> class _ReadEventsFn(beam.DoFn):
>>>>   def process(self, unused_element):
>>>> subscriber = myeventlibrary.Subscriber()
>>>> while True:
>>>>   # blocks until an event is received
>>>>   event = subscriber.get()
>>>>   yield event
>>>>
>>>> with beam.Pipeline() as pipe:
>>>>   (
>>>> pipe
>>>> | beam.Impulse()
>>>> | beam.ParDo(_ReadEventsFn())
>>>> | beam.Map(print)
>>>>   )
>>>>
>>>> I have a few questions:
>>>>
>>>>- When executing this in the DirectRunner with the default number
>>>>of workers (1), is it correct to assume the process would be constantly
>>>>blocking and rarely processing other parts of the pipeline?
>>>>- I noticed there is a decorator DoFn.unbounded_per_element which
>>>>seems useful, but it’s a bit unclear what it does. I also noticed in the
>>>>java docs it is an error to apply this decorator if the DoFn is not a 
>>>> SDF.
>>>>- I took a stab at writing this as a SDF. The incoming events are
>>>>not really something that can be read in parallel and they trickle in
>>>>slowly. In this scenario is there anything to gain by using a SDF?
>>>>
>>>> SDF can reject the split request so an SDF can be made to run as a
>>> single instance.
>>>
>>>>
>>>>
>>>> Thanks!
>>>> -Sam
>>>>
>>>


Re: Python SDF for unbound source

2022-03-09 Thread Sam Bourne
Thanks Ankur for the feedback.

Does anyone have any examples they could share writing a SDF for an unbound
source (ideally in python)?

The SDF I wrote looks something like this. It works fine using the
DirectRunner with the PipelineOption --direct_runner_mode='in_memory', but
doesn’t seem to operate properly using
--direct_runner_mode='multi_processing'. I’d like to run this in the
DataflowRunner, but I wanted to make sure the SDF was operating correctly
first.

Any tips would be greatly appreciated!

import apache_beam as beamfrom apache_beam.io.restriction_trackers
import OffsetRange, OffsetRestrictionTrackerfrom
apache_beam.transforms.core import RestrictionProvider
from myeventlibrary import Event, Subscriber
class InfRestrictionProvider(RestrictionProvider):
"""
An infinite restriction provider
"""

def initial_restriction(self, element):
return OffsetRange(0, float('inf'))

def create_tracker(self, restriction):
return OffsetRestrictionTracker(restriction)

def restriction_size(self, element, restriction):
return 1
@beam.typehints.with_output_types(Event)class _EventReadSDF(beam.DoFn):
"""
An SDF for subscribing to custom events.
"""

restriction_tracker = beam.DoFn.RestrictionParam(InfRestrictionProvider())

def __init__(self):
# type: () -> None
super(_FtrackEventReadSDF, self).__init__()
# number of events received
self._counter = 0

def process(self, _, restriction_tracker=restriction_tracker):
# type: (Any, beam.DoFn.RestrictionParam) -> Iterator[Event]

if not restriction_tracker.try_claim(self._counter):
return

subscriber = Subscriber()
try:
# Blocks until the next event is received.
yield subscriber.get()
finally:
self._counter += 1
restriction_tracker.defer_remainder()


On Wed, Mar 9, 2022 at 11:47 AM Ankur Goenka  wrote:

> Hi Sam,
>
> SDF can reject split requests so a SDF can be made to run a single
> instance.
> DoFn.unbounded_per_element Let the Beam model know that this is an
> unbounded source. It also tries to infer it
> https://github.com/apache/beam/blob/a126adbc6aa73f1e30adfa65a3710f7f69a7ba89/sdks/java/core/src/main/java/org/apache/beam/sdk/transforms/DoFn.java#L695
>
> As this will eventually run on a cluster, I would recommend going via the
> SDF route so that Watermarks and checkpoints can happen appropriately not
> just for this tranform but down stream tranforms.
>
> On Wed, Mar 9, 2022 at 10:54 AM Sam Bourne  wrote:
>
>> Hello! I’m looking for some help writing a custom transform that reads
>> from an unbound source. A basic version of this would look something like
>> this:
>>
>> import apache_beam as beam
>> import myeventlibrary
>> class _ReadEventsFn(beam.DoFn):
>>   def process(self, unused_element):
>> subscriber = myeventlibrary.Subscriber()
>> while True:
>>   # blocks until an event is received
>>   event = subscriber.get()
>>   yield event
>>
>> with beam.Pipeline() as pipe:
>>   (
>> pipe
>> | beam.Impulse()
>> | beam.ParDo(_ReadEventsFn())
>> | beam.Map(print)
>>   )
>>
>> I have a few questions:
>>
>>- When executing this in the DirectRunner with the default number of
>>workers (1), is it correct to assume the process would be constantly
>>blocking and rarely processing other parts of the pipeline?
>>- I noticed there is a decorator DoFn.unbounded_per_element which
>>seems useful, but it’s a bit unclear what it does. I also noticed in the
>>java docs it is an error to apply this decorator if the DoFn is not a SDF.
>>- I took a stab at writing this as a SDF. The incoming events are not
>>really something that can be read in parallel and they trickle in slowly.
>>In this scenario is there anything to gain by using a SDF?
>>
>> SDF can reject the split request so an SDF can be made to run as a single
> instance.
>
>>
>>
>> Thanks!
>> -Sam
>>
>


Python SDF for unbound source

2022-03-09 Thread Sam Bourne
Hello! I’m looking for some help writing a custom transform that reads from
an unbound source. A basic version of this would look something like this:

import apache_beam as beam
import myeventlibrary
class _ReadEventsFn(beam.DoFn):
  def process(self, unused_element):
subscriber = myeventlibrary.Subscriber()
while True:
  # blocks until an event is received
  event = subscriber.get()
  yield event

with beam.Pipeline() as pipe:
  (
pipe
| beam.Impulse()
| beam.ParDo(_ReadEventsFn())
| beam.Map(print)
  )

I have a few questions:

   - When executing this in the DirectRunner with the default number of
   workers (1), is it correct to assume the process would be constantly
   blocking and rarely processing other parts of the pipeline?
   - I noticed there is a decorator DoFn.unbounded_per_element which seems
   useful, but it’s a bit unclear what it does. I also noticed in the java
   docs it is an error to apply this decorator if the DoFn is not a SDF.
   - I took a stab at writing this as a SDF. The incoming events are not
   really something that can be read in parallel and they trickle in slowly.
   In this scenario is there anything to gain by using a SDF?

Thanks!
-Sam


Re: [Question] Beam+Python+Flink

2021-10-28 Thread Sam Bourne
Hey Chiara,

I went through a lot of the same struggles a while back and made this repo
to showcase how I accomplished something similar.
https://github.com/sambvfx/beam-flink-k8s

It shouldn't be hard to convert to a docker-compose setup (I actually had
it like this originally while testing before porting to kubernetes).


On Thu, Oct 28, 2021 at 10:41 AM Kyle Weaver  wrote:

> > I still cannot figure out how to make sure that the worker_pool is
> accessible via  'localhost' hostname.
>
> In Kubernetes, we make the worker pool a sidecar of the Flink task manager
> container. Perhaps there is a similar feature available in docker compose?
>
>
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/0310df76d6e2128cd5d2bc51fae4e842d370c463/examples/beam/without_job_server/beam_flink_cluster.yaml#L17-L20
>
> On Thu, Oct 28, 2021 at 10:30 AM Chiara Troiani <
> t.chiara.for@gmail.com> wrote:
>
>> Hi Jan,
>>
>> Thank you very much for your answer, and sorry for the late reply.
>>
>> I can see the job in flink UI, but it still fails, with this
>> exception among others:
>>
>> Caused by:
>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>>
>>
>> finishConnect(..) failed: Connection refused: localhost/127.0.0.1:5
>>
>> Caused by: java.net.ConnectException: finishConnect(..) failed:
>> Connection refused
>>
>>
>> I guess it is a Docker problem now.
>> I still cannot figure out how to make sure that the worker_pool is
>> accessible via  'localhost' hostname.
>>
>> Many thanks,
>> Chiara
>>
>> On Mon, Oct 18, 2021 at 11:56 AM Jan Lukavský  wrote:
>>
>>> Hi Chiara,
>>>
>>> environment_type LOOPBACK is meant for local execution only. The default
>>> is docker, which is not ideal when you use docker-compose (docker in
>>> docker), so the other option is to use EXTERNAL environment. With this
>>> environment, you must manually start the Python SDK harness as a separate
>>> container using apache/beam_python3.8_sdk docker image with args set to
>>> '--worker_pool'. That should run a container, that will take care of
>>> running the Python harness processes. It will by default listen on port
>>> 5, it must be accessible from the taskmanager container via
>>> localhost:5, and you then pass it via environment_config, e.g.:
>>>
>>>  --environment_type=EXTERNAL --environment_config=localhost:5
>>>
>>> That should do the trick. Because of limitations of the 'worker_pool'
>>> you must make sure, that it is accessible via 'localhost' hostname. For
>>> more information see [1].
>>>
>>>  Jan
>>>
>>> [1] https://beam.apache.org/documentation/runtime/sdk-harness-config/
>>> On 10/18/21 11:43, Chiara Troiani wrote:
>>>
>>> Hi,
>>>
>>>
>>> I am trying to follow these tutorials
>>>
>>> http://beam.apache.org/documentation/runners/flink/
>>>
>>> For the Portable (Python)
>>>
>>>
>>> I am not able to execute a Beam pipeline on a Flink cluster.
>>>
>>> I am running a Flink Session Cluster with docker-compose,
>>>
>>>
>>> This is my docker-compose file:
>>>
>>>
>>> ——
>>>
>>> version: "2.2"
>>>
>>> services:
>>>
>>>   jobmanager:
>>>
>>> image: flink:1.13.2-scala_2.11
>>>
>>> ports:
>>>
>>>   - "8081:8081"
>>>
>>> command: jobmanager
>>>
>>> environment:
>>>
>>>   - |
>>>
>>> FLINK_PROPERTIES=
>>>
>>> jobmanager.rpc.address: jobmanager
>>>
>>>
>>>   taskmanager:
>>>
>>> image: flink:1.13.2-scala_2.11
>>>
>>> depends_on:
>>>
>>>   - jobmanager
>>>
>>> command: taskmanager
>>>
>>> scale: 1
>>>
>>> environment:
>>>
>>>   - |
>>>
>>> FLINK_PROPERTIES=
>>>
>>> jobmanager.rpc.address: jobmanager
>>>
>>> taskmanager.numberOfTaskSlots: 2
>>>
>>>
>>> —
>>>
>>>
>>> I run the examples from a virtual environment, python3.8,
>>> apache-beam==2.32.0
>>>
>>> macOS Catalina 10.15.7
>>>
>>> Docker desktop 4.1.1
>>>
>>>
>>> When I run:
>>>
>>> python -m apache_beam.examples.wordcount --input=text.txt
>>> --output=out.txt --runner=FlinkRunner --flink_master=localhost:8081
>>> --environment_type=LOOPBACK
>>>
>>>
>>> I get this error:
>>>
>>> *org.apache.flink.runtime.JobException: Recovery is suppressed by
>>> NoRestartBackoffTimeStrategy*
>>>
>>> * at
>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)*
>>>
>>> * at
>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)*
>>>
>>> * at
>>> 

Re: using python sdk+kafka under k8s

2021-02-24 Thread Sam Bourne
Hi Yilun!

I made a quick proof of concept repo showcasing how to run a beam pipeline
in flink on k8s. It may be useful for you as reference.

https://github.com/sambvfx/beam-flink-k8s

On Wed, Feb 24, 2021, 8:13 AM yilun zhang  wrote:

> Hey,
>
> Our team is trying to use beam with connector Kafka and runner flink to
> gather information and process data. We adopt python sdk and build in java
> 11 in python 3.7 sdk image as java runtime for kafka expansion service.
>  so :
> image: beam python 3.7 docker image + build in java 11
> connector: kafka
> runner: flink
> container: kubernetes
>
> We encounter an docker not found error when running:
>  python3 -m kafka_test --runner=FlinkRunner
> --flink_master=flink-job-manager:8081 --flink_submit_uber_jar
> --environment_type=EXTERNAL --environment_config=localhost:5
>
> We notice that in https://beam.apache.org/roadmap/portability/ it
> mentioned the prerequisite also includes Docker. We wonder what is the
> docker usage here? Is there any suggested way to build docker in
> k8s container? (something maybe like sysbox for docker in docker?)
>
> Or maybe we should not use beam sdk+runner in k8s?
>
> Thanks,
> Yilun
>


Re: Deploying with Flink and Beam Python Worker

2021-01-14 Thread Sam Bourne
Thanks! I actually managed to have my own deployment for running it locally
and it works well for local file, but I get this weird error while trying
to run the word count example and I’m trying to understand what can be the
cause of it?

If you’re referring to this error:

2021/01/14 14:11:45 Failed to retrieve staged files: failed to retrieve
/tmp/staged in 3 attempts: failed to retrieve chunk for
/tmp/staged/pickled_main_session

Then that is likely because the flink taskmanager cannot find the staged
artifacts on disk. If you’re using the FlinkRunner you use the flag
--flink_submit_uber_jar which will stuff all the artifacts into a jar file
you submit. If you’re using the PortableRunner, then you need to share the
staging volume between your artifact service (typically running wherever
your job service is) and the flink taskworker like I have configured here
<https://github.com/sambvfx/beam-flink-k8s/tree/master/k8s/with_job_server>.

BTW - is this something we may want to add to the official repo? (I can do
it ofc). It took me a while to find how to set up a local deployment for
Flink and the docs weren’t super detailed.

More documentation is certainly helpful. It took me quite some time to
figure out what I have now so I would +1 any effort to save the next person

On Thu, Jan 14, 2021 at 11:42 AM Nir Gazit  wrote:

> BTW - is this something we may want to add to the official repo? (I can do
> it ofc). It took me a while to find how to set up a local deployment for
> Flink and the docs weren't super detailed.
>
> On Thu, Jan 14, 2021 at 9:39 PM Nir Gazit  wrote:
>
>> Thanks! I actually managed to have my own deployment for running it
>> locally and it works well for local file, but I get this weird error while
>> trying to run the word count example and I'm trying to understand what can
>> be the cause of it?
>>
>> On Thu, Jan 14, 2021 at 7:29 PM Sam Bourne  wrote:
>>
>>> Hi Nir,
>>>
>>> I have a simple repo where I have a proof of concept deployment setup
>>> for doing this.
>>>
>>> https://github.com/sambvfx/beam-flink-k8s
>>>
>>> Depending on the type of runner you're using there are a few
>>> explanations. That repo should hopefully point you in the right direction.
>>>
>>> On Thu, Jan 14, 2021 at 6:16 AM Nir Gazit  wrote:
>>>
>>>> Hi,
>>>> I'm trying to deploy the word count job on a Flink cluster in
>>>> Kubernetes. However, when trying to run the job (with python workers as a
>>>> side car to the Flink task masters), I get the following error:
>>>>
>>>> 2021/01/14 14:11:36 Initializing python harness: /opt/apache/beam/boot
>>>> --id=1-1 --logging_endpoint=localhost:39233
>>>> --artifact_endpoint=localhost:34123 --provision_endpoint=localhost:33519
>>>> --control_endpoint=localhost:44987
>>>> 2021/01/14 14:11:45 Failed to retrieve staged files: failed to retrieve
>>>> /tmp/staged in 3 attempts: failed to retrieve chunk for
>>>> /tmp/staged/pickled_main_session
>>>> caused by:
>>>> rpc error: code = Unknown desc = ; failed to retrieve chunk for
>>>> /tmp/staged/pickled_main_session
>>>> caused by:
>>>> rpc error: code = Unknown desc = ; failed to retrieve chunk for
>>>> /tmp/staged/pickled_main_session
>>>> caused by:
>>>> rpc error: code = Unknown desc = ; failed to retrieve chunk for
>>>> /tmp/staged/pickled_main_session
>>>> caused by:
>>>> rpc error: code = Unknown desc =
>>>>
>>>> Anyone knows what could be the reason?
>>>>
>>>


Re: Deploying with Flink and Beam Python Worker

2021-01-14 Thread Sam Bourne
Hi Nir,

I have a simple repo where I have a proof of concept deployment setup for
doing this.

https://github.com/sambvfx/beam-flink-k8s

Depending on the type of runner you're using there are a few explanations.
That repo should hopefully point you in the right direction.

On Thu, Jan 14, 2021 at 6:16 AM Nir Gazit  wrote:

> Hi,
> I'm trying to deploy the word count job on a Flink cluster in Kubernetes.
> However, when trying to run the job (with python workers as a side car to
> the Flink task masters), I get the following error:
>
> 2021/01/14 14:11:36 Initializing python harness: /opt/apache/beam/boot
> --id=1-1 --logging_endpoint=localhost:39233
> --artifact_endpoint=localhost:34123 --provision_endpoint=localhost:33519
> --control_endpoint=localhost:44987
> 2021/01/14 14:11:45 Failed to retrieve staged files: failed to retrieve
> /tmp/staged in 3 attempts: failed to retrieve chunk for
> /tmp/staged/pickled_main_session
> caused by:
> rpc error: code = Unknown desc = ; failed to retrieve chunk for
> /tmp/staged/pickled_main_session
> caused by:
> rpc error: code = Unknown desc = ; failed to retrieve chunk for
> /tmp/staged/pickled_main_session
> caused by:
> rpc error: code = Unknown desc = ; failed to retrieve chunk for
> /tmp/staged/pickled_main_session
> caused by:
> rpc error: code = Unknown desc =
>
> Anyone knows what could be the reason?
>


Re: Issues with python's external ReadFromPubSub

2020-10-28 Thread Sam Bourne
Yeah, I’m able to run that. apache_beam.io.ReadFromPubSub transform works
just fine but only for DirectRunner in python. In flink we’re using the
java implementation via an external transform
apache_beam.io.external.gcp.pubsub.ReadFromPubSub.

Is there a different way to do this?

On Wed, Oct 28, 2020 at 10:47 AM Kyle Weaver  wrote:

> Are you able to run streaming word count on the same setup?
>
> On Tue, Oct 27, 2020 at 5:39 PM Sam Bourne  wrote:
>
>> We updated from beam 2.18.0 to 2.24.0 and have been having issues using
>> the python ReadFromPubSub external transform in flink 1.10. It seems
>> like it starts up just fine, but it doesn’t consume any messages.
>>
>> I tried to reduce it to a simple example and tested back to beam 2.22.0
>> but have gotten the same results (of no messages being read).
>>
>> I have a hard time believing that it’s been broken for so many versions
>> but I can’t seem to identify what I’ve done wrong.
>>
>> Steps to test:
>>
>> 1) Spin up the expansion service
>>
>> docker run -d --rm --network host apache/beam_flink1.10_job_server:2.24.0
>>
>> 2) Create a simple pipeline using
>> apache_beam.io.external.gcp.pubsub.ReadFromPubSub
>> Here’s mine:
>> https://gist.github.com/sambvfx/a8582f4805e468a97331b0eb13911ebf
>>
>> 3) Run the pipeline
>>
>> python -m pubsub_example --runner=FlinkRunner --save_main_session 
>> --flink_submit_uber_jar --environment_type=DOCKER 
>> --environment_config=apache/beam_python3.7_sdk:2.24.0 
>> --checkpointing_interval=1 --streaming
>>
>> 4) Emit a few pubsub messages
>>
>> python -m pubsub_example --msg hello
>> python -m pubsub_example --msg world
>>
>> What am I missing here?
>> -Sam
>> --
>>
>> Some debugging things I’ve tried:
>>
>>- I can run ReadFromPubSub (the DirectRunner version just fine).
>>- I confirmed that the gcloud credentials make it into the java sdk
>>container that spins up. Without these you get credential type errors.
>>- I modified the java DockerEnvironmentFactory to instead mount
>>GOOGLE_APPLICATION_CREDENTIALS service account .json and set the env
>>var.
>>- I’ve tried a variety of different flink flags.
>>
>>


Issues with python's external ReadFromPubSub

2020-10-27 Thread Sam Bourne
We updated from beam 2.18.0 to 2.24.0 and have been having issues using the
python ReadFromPubSub external transform in flink 1.10. It seems like it
starts up just fine, but it doesn’t consume any messages.

I tried to reduce it to a simple example and tested back to beam 2.22.0 but
have gotten the same results (of no messages being read).

I have a hard time believing that it’s been broken for so many versions but
I can’t seem to identify what I’ve done wrong.

Steps to test:

1) Spin up the expansion service

docker run -d --rm --network host apache/beam_flink1.10_job_server:2.24.0

2) Create a simple pipeline using
apache_beam.io.external.gcp.pubsub.ReadFromPubSub
Here’s mine:
https://gist.github.com/sambvfx/a8582f4805e468a97331b0eb13911ebf

3) Run the pipeline

python -m pubsub_example --runner=FlinkRunner --save_main_session
--flink_submit_uber_jar --environment_type=DOCKER
--environment_config=apache/beam_python3.7_sdk:2.24.0
--checkpointing_interval=1 --streaming

4) Emit a few pubsub messages

python -m pubsub_example --msg hello
python -m pubsub_example --msg world

What am I missing here?
-Sam
--

Some debugging things I’ve tried:

   - I can run ReadFromPubSub (the DirectRunner version just fine).
   - I confirmed that the gcloud credentials make it into the java sdk
   container that spins up. Without these you get credential type errors.
   - I modified the java DockerEnvironmentFactory to instead mount
   GOOGLE_APPLICATION_CREDENTIALS service account .json and set the env var.
   - I’ve tried a variety of different flink flags.


Re: Flink JobService on k8s

2020-09-24 Thread Sam Bourne
Thank you Kyle for clarifying things for me. I've confirmed it works by
simply sharing the artifact staging volume between the jobserver and
taskmanager pods. This works fine with the dind setup using the docker
environment.

Thanks again,
Sam

On Tue, Sep 22, 2020 at 4:37 PM Kyle Weaver  wrote:

> > It was my understanding that the client first uploads the artifacts to
> the jobserver and then the SDK harness will pull in these artifacts from
> the jobserver over a gRPC port.
> Not quite. The artifact endpoint is for communicating which artifacts are
> needed, and where to find them. But the SDK harness pulls the actual
> artifacts itself.
>
> > Do the jobserver and the taskmanager need to share the artifact staging
> volume.
>
> More precisely, the job server and the SDK harness need to share the
> artifact staging volume (which is why we generally recommend using a
> distributed filesystem for this purpose if possible).
>
> General note: there is never any direct communication between the job
> server and the SDK harness. Usually it goes Beam job server -> Flink job
> manager -> Flink task manager -> Beam SDK harness.
>
> On Tue, Sep 22, 2020 at 4:20 PM Sam Bourne  wrote:
>
>> It was my understanding that the client first uploads the artifacts to
>> the jobserver and then the SDK harness will pull in these artifacts from
>> the jobserver over a gRPC port.
>>
>> I see the artifacts on the jobserver while the job is attempting to run:
>>
>> root@flink-beam-jobserver-9fccb99b8-6mhtq
>> :/tmp/beam-artifact-staging/3024e5d862fef831e830945b2d3e4e9511e0423bfb9c48de75aa2b3b67decce4
>>
>> Do the jobserver and the taskmanager need to share the artifact staging
>> volume?
>>
>> On Tue, Sep 22, 2020 at 4:04 PM Kyle Weaver  wrote:
>>
>>> > rpc error: code = Unknown desc = ; failed to retrieve chunk for
>>> /tmp/staged/pickled_main_session
>>>
>>> Are you sure that's due to a networking issue, and not a problem with
>>> the filesystem / volume mounting?
>>>
>>> On Tue, Sep 22, 2020 at 3:55 PM Sam Bourne  wrote:
>>>
>>>> I would not be surprised if there was something weird going on with
>>>>> Docker in Docker. The defaults mostly work fine when an external SDK
>>>>> harness is used [1].
>>>>>
>>>> Can you provide more information on the exception you got? (I'm
>>>>> particularly interested in the line number).
>>>>>
>>>> The actual error is a bit tricky to find but if you monitor the docker
>>>> logs from within the taskmanager pod you can find it failing when the SDK
>>>> harness boot.go attempts to pull the the artifacts from the artifact
>>>> endpoint [1]
>>>> [1]
>>>> https://github.com/apache/beam/blob/master/sdks/python/container/boot.go#L139
>>>>
>>>> 2020/09/22 22:07:51 Initializing python harness: /opt/apache/beam/boot 
>>>> --id=1-1 --provision_endpoint=localhost:45775
>>>> 2020/09/22 22:07:59 Failed to retrieve staged files: failed to retrieve 
>>>> /tmp/staged in 3 attempts: failed to retrieve chunk for 
>>>> /tmp/staged/pickled_main_session
>>>> caused by:
>>>> rpc error: code = Unknown desc = ; failed to retrieve chunk for 
>>>> /tmp/staged/pickled_main_session
>>>>
>>>> I can hit the jobserver fine from my taskmanager pod, as well as from
>>>> within a SDK container I spin up manually (with —network host):
>>>>
>>>> root@flink-taskmanager-56c6bdb6dd-49md7:/opt/apache/beam# ping 
>>>> flink-beam-jobserver
>>>> PING flink-beam-jobserver.default.svc.cluster.local (10.103.16.129) 56(84) 
>>>> bytes of data.
>>>>
>>>> I don’t see how this would work if the endpoint hostname is localhost.
>>>> I’ll explore how this is working in the flink-on-k8s-operator.
>>>>
>>>> Thanks for taking a look!
>>>> Sam
>>>>
>>>> On Tue, Sep 22, 2020 at 2:48 PM Kyle Weaver 
>>>> wrote:
>>>>
>>>>> > The issue is that the jobserver does not provide the proper
>>>>> endpoints to the SDK harness when it submits the job to flink.
>>>>>
>>>>> I would not be surprised if there was something weird going on with
>>>>> Docker in Docker. The defaults mostly work fine when an external SDK
>>>>> harness is used [1].
>>>>>
>>>>> Can you provide more information on the exception you got? (

Re: Flink JobService on k8s

2020-09-22 Thread Sam Bourne
It was my understanding that the client first uploads the artifacts to the
jobserver and then the SDK harness will pull in these artifacts from the
jobserver over a gRPC port.

I see the artifacts on the jobserver while the job is attempting to run:

root@flink-beam-jobserver-9fccb99b8-6mhtq
:/tmp/beam-artifact-staging/3024e5d862fef831e830945b2d3e4e9511e0423bfb9c48de75aa2b3b67decce4

Do the jobserver and the taskmanager need to share the artifact staging
volume?

On Tue, Sep 22, 2020 at 4:04 PM Kyle Weaver  wrote:

> > rpc error: code = Unknown desc = ; failed to retrieve chunk for
> /tmp/staged/pickled_main_session
>
> Are you sure that's due to a networking issue, and not a problem with the
> filesystem / volume mounting?
>
> On Tue, Sep 22, 2020 at 3:55 PM Sam Bourne  wrote:
>
>> I would not be surprised if there was something weird going on with
>>> Docker in Docker. The defaults mostly work fine when an external SDK
>>> harness is used [1].
>>>
>> Can you provide more information on the exception you got? (I'm
>>> particularly interested in the line number).
>>>
>> The actual error is a bit tricky to find but if you monitor the docker
>> logs from within the taskmanager pod you can find it failing when the SDK
>> harness boot.go attempts to pull the the artifacts from the artifact
>> endpoint [1]
>> [1]
>> https://github.com/apache/beam/blob/master/sdks/python/container/boot.go#L139
>>
>> 2020/09/22 22:07:51 Initializing python harness: /opt/apache/beam/boot 
>> --id=1-1 --provision_endpoint=localhost:45775
>> 2020/09/22 22:07:59 Failed to retrieve staged files: failed to retrieve 
>> /tmp/staged in 3 attempts: failed to retrieve chunk for 
>> /tmp/staged/pickled_main_session
>> caused by:
>> rpc error: code = Unknown desc = ; failed to retrieve chunk for 
>> /tmp/staged/pickled_main_session
>>
>> I can hit the jobserver fine from my taskmanager pod, as well as from
>> within a SDK container I spin up manually (with —network host):
>>
>> root@flink-taskmanager-56c6bdb6dd-49md7:/opt/apache/beam# ping 
>> flink-beam-jobserver
>> PING flink-beam-jobserver.default.svc.cluster.local (10.103.16.129) 56(84) 
>> bytes of data.
>>
>> I don’t see how this would work if the endpoint hostname is localhost.
>> I’ll explore how this is working in the flink-on-k8s-operator.
>>
>> Thanks for taking a look!
>> Sam
>>
>> On Tue, Sep 22, 2020 at 2:48 PM Kyle Weaver  wrote:
>>
>>> > The issue is that the jobserver does not provide the proper endpoints
>>> to the SDK harness when it submits the job to flink.
>>>
>>> I would not be surprised if there was something weird going on with
>>> Docker in Docker. The defaults mostly work fine when an external SDK
>>> harness is used [1].
>>>
>>> Can you provide more information on the exception you got? (I'm
>>> particularly interested in the line number).
>>>
>>> > The issue is that the jobserver does not provide the proper endpoints
>>> to the SDK harness when it submits the job to flink.
>>>
>>> More information about this failure mode would be helpful as well.
>>>
>>> [1]
>>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_job_server.yaml
>>>
>>>
>>> On Tue, Sep 22, 2020 at 10:36 AM Sam Bourne  wrote:
>>>
>>>> Hello beam community!
>>>>
>>>> I’m looking for some help solving an issue running a beam job on flink
>>>> using --environment_type DOCKER.
>>>>
>>>> I have a flink cluster running in kubernetes configured so the
>>>> taskworkers can run docker [1]. I’m attempting to deploy a flink jobserver
>>>> in the cluster. The issue is that the jobserver does not provide the proper
>>>> endpoints to the SDK harness when it submits the job to flink. It typically
>>>> provides something like localhost:34567 using the hostname the grpc
>>>> server was bound to. There is a jobserver flag --job-host that will
>>>> bind the grpc server to this provided hostname, but I cannot seem to get it
>>>> to bind to the k8s jobservice Service name [2]. I’ve tried different
>>>> flavors of FQDNs but haven’t had any luck.
>>>>
>>>> [main] INFO org.apache.beam.runners.jobsubmission.JobServerDriver - 
>>>> ArtifactStagingService started on flink-beam-jobserver:8098
>>>> [main] WARN org.apache.beam.runners.jobsubmission.JobServerDriver - 
>>>> Exception during job server creation
>>>> java.io.IOException: Failed to bind
>>>> ...
>>>>
>>>> Does anyone have some experience with this that could help provide some
>>>> guidance?
>>>>
>>>> Cheers,
>>>> Sam
>>>>
>>>> [1] https://github.com/sambvfx/beam-flink-k8s
>>>> [2]
>>>> https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/beam-flink-jobserver.yaml#L29
>>>>
>>>


Re: Flink JobService on k8s

2020-09-22 Thread Sam Bourne
I would not be surprised if there was something weird going on with Docker
> in Docker. The defaults mostly work fine when an external SDK harness is
> used [1].
>
Can you provide more information on the exception you got? (I'm
> particularly interested in the line number).
>
The actual error is a bit tricky to find but if you monitor the docker logs
from within the taskmanager pod you can find it failing when the SDK
harness boot.go attempts to pull the the artifacts from the artifact
endpoint [1]
[1]
https://github.com/apache/beam/blob/master/sdks/python/container/boot.go#L139

2020/09/22 22:07:51 Initializing python harness: /opt/apache/beam/boot
--id=1-1 --provision_endpoint=localhost:45775
2020/09/22 22:07:59 Failed to retrieve staged files: failed to
retrieve /tmp/staged in 3 attempts: failed to retrieve chunk for
/tmp/staged/pickled_main_session
caused by:
rpc error: code = Unknown desc = ; failed to retrieve chunk for
/tmp/staged/pickled_main_session

I can hit the jobserver fine from my taskmanager pod, as well as from
within a SDK container I spin up manually (with —network host):

root@flink-taskmanager-56c6bdb6dd-49md7:/opt/apache/beam# ping
flink-beam-jobserver
PING flink-beam-jobserver.default.svc.cluster.local (10.103.16.129)
56(84) bytes of data.

I don’t see how this would work if the endpoint hostname is localhost. I’ll
explore how this is working in the flink-on-k8s-operator.

Thanks for taking a look!
Sam

On Tue, Sep 22, 2020 at 2:48 PM Kyle Weaver  wrote:

> > The issue is that the jobserver does not provide the proper endpoints to
> the SDK harness when it submits the job to flink.
>
> I would not be surprised if there was something weird going on with Docker
> in Docker. The defaults mostly work fine when an external SDK harness is
> used [1].
>
> Can you provide more information on the exception you got? (I'm
> particularly interested in the line number).
>
> > The issue is that the jobserver does not provide the proper endpoints to
> the SDK harness when it submits the job to flink.
>
> More information about this failure mode would be helpful as well.
>
> [1]
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_job_server.yaml
>
>
> On Tue, Sep 22, 2020 at 10:36 AM Sam Bourne  wrote:
>
>> Hello beam community!
>>
>> I’m looking for some help solving an issue running a beam job on flink
>> using --environment_type DOCKER.
>>
>> I have a flink cluster running in kubernetes configured so the
>> taskworkers can run docker [1]. I’m attempting to deploy a flink jobserver
>> in the cluster. The issue is that the jobserver does not provide the proper
>> endpoints to the SDK harness when it submits the job to flink. It typically
>> provides something like localhost:34567 using the hostname the grpc
>> server was bound to. There is a jobserver flag --job-host that will bind
>> the grpc server to this provided hostname, but I cannot seem to get it to
>> bind to the k8s jobservice Service name [2]. I’ve tried different flavors
>> of FQDNs but haven’t had any luck.
>>
>> [main] INFO org.apache.beam.runners.jobsubmission.JobServerDriver - 
>> ArtifactStagingService started on flink-beam-jobserver:8098
>> [main] WARN org.apache.beam.runners.jobsubmission.JobServerDriver - 
>> Exception during job server creation
>> java.io.IOException: Failed to bind
>> ...
>>
>> Does anyone have some experience with this that could help provide some
>> guidance?
>>
>> Cheers,
>> Sam
>>
>> [1] https://github.com/sambvfx/beam-flink-k8s
>> [2]
>> https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/beam-flink-jobserver.yaml#L29
>>
>


Re: Getting Beam(Python)-on-Flink-on-k8s to work

2020-08-30 Thread Sam Bourne
On Sat, Aug 29, 2020 at 10:59 AM Eugene Kirpichov 
wrote:


>
> On Fri, Aug 28, 2020 at 6:52 PM Sam Bourne  wrote:
>
>> Hi Eugene,
>>
>> Glad that helped you out and thanks for the PR tweaking it for GCP.
>>
>> To fetch the containers from GCR, I had to log into Docker inside the
>> Flink nodes, specifically inside the taskmanager container, using something
>> like “kubectl exec pod/flink-taskmanager-blahblah -c taskmanager — docker
>> login -u oauth2accesstoken —password $(gcloud auth print-access-token)”
>>
>> Ouch that seems painful. I find this “precaching” step pretty silly and
>> have considered making the DockerEnvironmentFactory a little more
>> intelligent about how it deals with timeouts (e.g. no activity). It doesn’t
>> seem like it would be too difficult to also add first-class support for
>> pulling images from protected repositories. Extending the DockerPayload
>> protobuf to pass along the additional information and tweaking the
>> DockerEnvironmentFactory? I’m not a java expert but that might be worth
>> exploring if this continues to be problematic.
>>
> Yeah it makes sense to have some first-class support for Docker
> credentials. It's kind of a no-brainer that it's necessary with custom
> containers, many companies probably wouldn't want to push their custom
> containers to a public repo.
> I was thinking of embedding the credentials JSON file into the
> taskmanager container through its Dockerfile, that's workable but also
> pretty silly - having to rebuild this container just for the sake of
> putting in the credentials.
> DockerPayload might be the right place to put credentials, but I wonder if
> there's a way to do something more secure, with k8s secrets. I'm not too
> well-versed in credential management.
>
Using k8s secrets you could mount your credentials into the container and
tweak the pull/run command
<https://github.com/apache/beam/blob/master/runners/java-fn-execution/src/main/java/org/apache/beam/runners/fnexecution/environment/DockerCommand.java#L77>
to first login using a pattern like cat /tmp/password.txt | docker login
--username foo --password-stdin. Maybe the DockerPayload protobuf could
include the password as raw-text or an absolute filepath and switch the
login command depending.

I found the time it takes to pull can be dramatically improved if you store
>> everything in memory
>> <https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/flink.yaml#L186>
>> .
>>
>> In the end, I got my pipeline to start, create the uber jar (about 240MB
>> in size), take a few minutes to transmit it to Flink
>>
>> You could explore spinning up the beam-flink-job-server
>> <https://github.com/sambvfx/beam-flink-k8s/blob/master/k8s/beam-flink-jobserver.yaml>
>> and using the PortableRunner. In theory that should reduce the amount of
>> data you’re syncing to the cluster. It does require exposing at least two
>> ingress points (8099 and 8098) so you can hit the job and artifact services
>> respectively.
>>
> Right, good idea! Haven't tried spinning up the job server. Exposing the
> job and artifact services seems pretty easy; but would also need to replace
> the jobserver image "apache/beam_flink1.10_job_server:2.23.0" with a
> custom-built one with the Beam 2.24 snapshot we're using.
>

This may be necessary anyways depending how you handle the docker login
stuff. In any case good luck!


>
>
>> Cheers,
>> Sam
>>
>> On Fri, Aug 28, 2020 at 5:50 PM Eugene Kirpichov 
>> wrote:
>>
>>> Woohoo thanks Kyle, adding --save_main_session made it work!!!
>>>
>>> On Fri, Aug 28, 2020 at 5:02 PM Kyle Weaver  wrote:
>>>
>>>> > rpc error: code = Unimplemented desc = Method not found:
>>>> org.apache.beam.model.job_management.v1.LegacyArtifactRetrievalService/GetManifest
>>>>
>>>> This is a known issue: https://issues.apache.org/jira/browse/BEAM-10762
>>>>
>>>> On Fri, Aug 28, 2020 at 4:57 PM Eugene Kirpichov 
>>>> wrote:
>>>>
>>>>> P.S. Ironic how back in 2018 I was TL-ing the portable runners effort
>>>>> for a few months on Google side, and now I need community help to get it 
>>>>> to
>>>>> work at all.
>>>>> Still pretty miraculous how far Beam's portability has come since
>>>>> then, even if it has a steep learning curve.
>>>>>
>>>>> On Fri, Aug 28, 2020 at 4:54 PM Eugene Kirpichov 
>>>>> wrote:
>>>>>
>>>>>> Hi Sam,
>>>>>>
>>>>>&

Re: Getting Beam(Python)-on-Flink-on-k8s to work

2020-08-27 Thread Sam Bourne
Hi Eugene!

I’m struggling to find complete documentation on how to do this. There
seems to be lots of conflicting or incomplete information: several ways to
deploy Flink, several ways to get Beam working with it, bizarre
StackOverflow questions, and no documentation explaining a complete working
example.

This *is* possible and I went through all the same frustrations of sparse
and confusing documentation. I’m glossing over a lot of details, but the
key thing was setting up the flink taskworker(s) to run docker. This
requires running docker-in-docker as the taskworker itself is a docker
container in k8s.

First create a custom flink container with docker:

# docker-flink Dockerfile

FROM flink:1.10
# install docker
RUN apt-get ...

Then setup the taskmanager deployment to use a sidecar docker-in-docker
service. This dind service is where the python sdk harness container
actually runs.

kind: Deployment
...
  containers:
  - name: docker
image: docker:19.03.5-dind
...
  - name: taskmanger
image: myregistry:5000/docker-flink:1.10
env:
- name: DOCKER_HOST
  value: tcp://localhost:2375
...

I quickly threw all these pieces together in a repo here:
https://github.com/sambvfx/beam-flink-k8s

I added a working (via minikube) step-by-step in the README to prove to
myself that I didn’t miss anything, but feel free to submit any PRs if you
want to add anything useful.

The documents you linked are very informative. It would be great to
aggregate all this into digestible documentation. Let me know if you have
any further questions!

Cheers,
Sam

On Thu, Aug 27, 2020 at 10:25 AM Eugene Kirpichov 
wrote:

> Hi Kyle,
>
> Thanks for the response!
>
> On Wed, Aug 26, 2020 at 5:28 PM Kyle Weaver  wrote:
>
>> > - With the Flink operator, I was able to submit a Beam job, but hit the
>> issue that I need Docker installed on my Flink nodes. I haven't yet tried
>> changing the operator's yaml files to add Docker inside them.
>>
>> Running Beam workers via Docker on the Flink nodes is not recommended
>> (and probably not even possible), since the Flink nodes are themselves
>> already running inside Docker containers. Running workers as sidecars
>> avoids that problem. For example:
>> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/master/examples/beam/with_job_server/beam_flink_cluster.yaml#L17-L20
>>
>> The main problem with the sidecar approach is that I can't use the Flink
> cluster as a "service" for anybody to submit their jobs with custom
> containers - the container version is fixed.
> Do I understand it correctly?
> Seems like the Docker-in-Docker approach is viable, and is mentioned in
> the Beam Flink K8s design doc
> 
> .
>
>
>> > I also haven't tried this
>> 
>>  yet
>> because it implies submitting jobs using "kubectl apply"  which is weird -
>> why not just submit it through the Flink job server?
>>
>> I'm guessing it goes through k8s for monitoring purposes. I see no reason
>> it shouldn't be possible to submit to the job server directly through
>> Python, network permitting, though I haven't tried this.
>>
>>
>>
>> On Wed, Aug 26, 2020 at 4:10 PM Eugene Kirpichov 
>> wrote:
>>
>>> Hi folks,
>>>
>>> I'm still working with Pachama  right now; we
>>> have a Kubernetes Engine cluster on GCP and want to run Beam Python batch
>>> pipelines with custom containers against it.
>>> Flink and Cloud Dataflow are the two options; Cloud Dataflow doesn't
>>> support custom containers for batch pipelines yes so we're going with Flink.
>>>
>>> I'm struggling to find complete documentation on how to do this. There
>>> seems to be lots of conflicting or incomplete information: several ways to
>>> deploy Flink, several ways to get Beam working with it, bizarre
>>> StackOverflow questions, and no documentation explaining a complete working
>>> example.
>>>
>>> == My requests ==
>>> * Could people briefly share their working setup? Would be good to know
>>> which directions are promising.
>>> * It would be particularly helpful if someone could volunteer an hour of
>>> their time to talk to me about their working Beam/Flink/k8s setup. It's for
>>> a good cause (fixing the planet :) ) and on my side I volunteer to write up
>>> the findings to share with the community so others suffer less.
>>>
>>> == Appendix: My findings so far ==
>>> There are multiple ways to deploy Flink on k8s:
>>> - The GCP marketplace Flink operator
>>> 
>>>  (couldn't
>>> get it to work) and the respective CLI version
>>>  (buggy,
>>> but I got it working)
>>> - https://github.com/lyft/flinkk8soperator (haven't tried)
>>> - Flink's native