Re: Individual Parallelism support for Flink Runner

2020-06-26 Thread Reuven Lax
It's an interesting question - this parameter is clearly very runner
specific (e.g. it would be meaningless for the Dataflow runner, where
parallelism is not a static constant). How should we go about passing
runner-specific options per transform?

On Fri, Jun 26, 2020 at 1:14 PM Akshay Iyangar  wrote:

> Hi beam community,
>
>
>
> So I had brought this issue in our slack channel but I guess this warrants
> a deeper discussion and if we do go about what is the POA for it.
>
>
>
> So basically currently for Flink Runner we don’t support operator level
> parallelism which native Flink provides OOTB. So I was wondering what the
> community feels about having some way to pass parallelism for individual
> operators esp.  for some of the existing IO’s
>
>
>
> Wanted to know what people think of this.
>
>
>
> Thanks
>
> Akshay I
>


Error in FlinkRunnerTest.test_external_transforms

2020-06-26 Thread Alex Amato
Hi,

I was wondering if this is something wrong with my PR
 or an issue in master.
Thanks for your help.

Seeing this in my PR's presubmit
https://ci-beam.apache.org/job/beam_PreCommit_Python2_PVR_Flink_Commit/5382/

Logs


==
ERROR: test_external_transforms (__main__.FlinkRunnerTest)
--
Traceback (most recent call last):
 Timed out after 60 seconds. 
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python2_PVR_Flink_Commit/src/sdks/python/apache_beam/runners/portability/flink_runner_test.py",
line 204, in test_external_transforms

assert_that(res, equal_to([i for i in range(1, 10)]))
# Thread: 
  File "apache_beam/pipeline.py", line 547, in __exit__
self.run().wait_until_finish()

# Thread: 
  File "apache_beam/runners/portability/portable_runner.py", line 543,
in wait_until_finish
self._observe_state(message_thread)
  File "apache_beam/runners/portability/portable_runner.py", line 552,
in _observe_state

for state_response in self._state_stream:
# Thread: <_Worker(Thread-110, started daemon 140197924693760)>
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python2_PVR_Flink_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_channel.py",
line 413, in next
return self._next()

  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python2_PVR_Flink_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_channel.py",
line 697, in _next
# Thread: <_MainThread(MainThread, started 140200366741248)>
_common.wait(self._state.condition.wait, _response_ready)
  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python2_PVR_Flink_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_common.py",
line 138, in wait
_wait_once(wait_fn, MAXIMUM_WAIT_TIMEOUT, spin_cb)

  File 
"/home/jenkins/jenkins-slave/workspace/beam_PreCommit_Python2_PVR_Flink_Commit/src/build/gradleenv/1866363813/local/lib/python2.7/site-packages/grpc/_common.py",
line 103, in _wait_once
wait_fn(timeout=timeout)
# Thread: 
  File "/usr/lib/python2.7/threading.py", line 359, in wait
_sleep(delay)
  File "apache_beam/runners/portability/portable_runner_test.py", line
82, in handler
raise BaseException(msg)
BaseException: Timed out after 60 seconds.


# Thread: <_Worker(Thread-18, started daemon 140198537066240)>

# Thread: 

--
# Thread: <_Worker(Thread-19, started daemon 140198528673536)>

Ran 82 tests in 461.409s

FAILED (errors=1, skipped=15)


Re: Commands to detect style issues quickly before sending PR

2020-06-26 Thread Udi Meiri
Another tip for Python:
Also "pip install pre-commit" will run yapf and pylint on changed files
during "git commit".


On Fri, Jun 26, 2020 at 4:03 PM Valentyn Tymofieiev 
wrote:

> See also:
> https://cwiki.apache.org/confluence/display/BEAM/Python+Tips#PythonTips-Formatting
>
> > Task :sdks:python:test-suites:tox:py38:setupVirtualenv FAILED
> https://github.com/apache/beam/pull/12109 should fix that.
>
> On Fri, Jun 26, 2020 at 2:51 PM Alex Amato  wrote:
>
>> I sent out some PRs a few days ago, and quickly discovered a bunch of
>> errors and have been spending most of my time playing wack-a-mole without
>> knowing how to repro them all locally.
>>
>> I asked this a few years ago, and wanted to make sure I have something up
>> to date to work with. Ideally, I'd like a single command line for
>> simplicity. Here is what I've been using. I'm not sure if we have a script
>> or gradle target which already covers this or not
>>
>> *Java*
>> time ./gradlew spotlessApply && ./gradlew checkstyleMain checkstyleTest
>> javadoc spotbugsMain compileJava compileTestJava
>>
>> *Python *
>> ./gradlew  :sdks:python:test-suites:tox:py2:lintPy27_3 && ./gradlew  
>> :sdks:python:test-suites:tox:py37:lintPy37
>> && ./gradlew :sdks:python:test-suites:tox:py38:formatter
>>
>> (I think this might be correct, maybe there is a faster way to run it
>> directly with tox as well)
>>
>> 
>> Though the python command is failing for me, perhaps I need to install
>> another python version. I think we have setup steps for those in the wiki...
>>
>>
>> creating
>> build/temp.linux-x86_64-3.8/third_party/protobuf/src/google/protobuf/util/internal
>>
>> creating
>> build/temp.linux-x86_64-3.8/third_party/protobuf/src/google/protobuf/stubs
>>
>> creating
>> build/temp.linux-x86_64-3.8/third_party/protobuf/src/google/protobuf/io
>>
>> x86_64-linux-gnu-gcc -pthread -Wno-unused-result -Wsign-compare
>> -DNDEBUG -g -fwrapv -O2 -Wall -g -fstack-protector-strong -Wformat
>> -Werror=format-security -g -fwrapv -O2 -g -fstack-protector-strong -Wformat
>> -Werror=format-security -Wdate-time -D_FORTIFY_SOURCE=2 -fPIC
>> -DHAVE_PTHREAD=1 -I. -Igrpc_root -Igrpc_root/include
>> -Ithird_party/protobuf/src -I/usr/include/python3.8
>> -I/usr/local/google/home/ajamato/beam/build/gradleenv/-1227304282/include/python3.8
>> -c grpc_tools/_protoc_compiler.cpp -o
>> build/temp.linux-x86_64-3.8/grpc_tools/_protoc_compiler.o -std=c++11
>> -fno-wrapv -frtti
>>
>> grpc_tools/_protoc_compiler.cpp:216:10: fatal error: Python.h: No
>> such file or directory
>>
>>   216 | #include "Python.h"
>>
>>   |  ^~
>>
>> compilation terminated.
>>
>> error: command 'x86_64-linux-gnu-gcc' failed with exit status 1
>>
>> 
>>
>> ERROR: Command errored out with exit status 1:
>> /usr/local/google/home/ajamato/beam/build/gradleenv/-1227304282/bin/python3.8
>> -u -c 'import sys, setuptools, tokenize; sys.argv[0] =
>> '"'"'/tmp/pip-install-xmf_k_sy/grpcio-tools/setup.py'"'"';
>> __file__='"'"'/tmp/pip-install-xmf_k_sy/grpcio-tools/setup.py'"'"';f=getattr(tokenize,
>> '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"',
>> '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))'
>> install --record /tmp/pip-record-9bhhuq55/install-record.txt
>> --single-version-externally-managed --compile --install-headers
>> /usr/local/google/home/ajamato/beam/build/gradleenv/-1227304282/include/site/python3.8/grpcio-tools
>> Check the logs for full command output.
>>
>>
>> *> Task :sdks:python:test-suites:tox:py38:setupVirtualenv* FAILED
>>
>


smime.p7s
Description: S/MIME Cryptographic Signature


Re: Commands to detect style issues quickly before sending PR

2020-06-26 Thread Valentyn Tymofieiev
See also:
https://cwiki.apache.org/confluence/display/BEAM/Python+Tips#PythonTips-Formatting

> Task :sdks:python:test-suites:tox:py38:setupVirtualenv FAILED
https://github.com/apache/beam/pull/12109 should fix that.

On Fri, Jun 26, 2020 at 2:51 PM Alex Amato  wrote:

> I sent out some PRs a few days ago, and quickly discovered a bunch of
> errors and have been spending most of my time playing wack-a-mole without
> knowing how to repro them all locally.
>
> I asked this a few years ago, and wanted to make sure I have something up
> to date to work with. Ideally, I'd like a single command line for
> simplicity. Here is what I've been using. I'm not sure if we have a script
> or gradle target which already covers this or not
>
> *Java*
> time ./gradlew spotlessApply && ./gradlew checkstyleMain checkstyleTest
> javadoc spotbugsMain compileJava compileTestJava
>
> *Python *
> ./gradlew  :sdks:python:test-suites:tox:py2:lintPy27_3 && ./gradlew  
> :sdks:python:test-suites:tox:py37:lintPy37
> && ./gradlew :sdks:python:test-suites:tox:py38:formatter
>
> (I think this might be correct, maybe there is a faster way to run it
> directly with tox as well)
>
> 
> Though the python command is failing for me, perhaps I need to install
> another python version. I think we have setup steps for those in the wiki...
>
>
> creating
> build/temp.linux-x86_64-3.8/third_party/protobuf/src/google/protobuf/util/internal
>
> creating
> build/temp.linux-x86_64-3.8/third_party/protobuf/src/google/protobuf/stubs
>
> creating
> build/temp.linux-x86_64-3.8/third_party/protobuf/src/google/protobuf/io
>
> x86_64-linux-gnu-gcc -pthread -Wno-unused-result -Wsign-compare
> -DNDEBUG -g -fwrapv -O2 -Wall -g -fstack-protector-strong -Wformat
> -Werror=format-security -g -fwrapv -O2 -g -fstack-protector-strong -Wformat
> -Werror=format-security -Wdate-time -D_FORTIFY_SOURCE=2 -fPIC
> -DHAVE_PTHREAD=1 -I. -Igrpc_root -Igrpc_root/include
> -Ithird_party/protobuf/src -I/usr/include/python3.8
> -I/usr/local/google/home/ajamato/beam/build/gradleenv/-1227304282/include/python3.8
> -c grpc_tools/_protoc_compiler.cpp -o
> build/temp.linux-x86_64-3.8/grpc_tools/_protoc_compiler.o -std=c++11
> -fno-wrapv -frtti
>
> grpc_tools/_protoc_compiler.cpp:216:10: fatal error: Python.h: No
> such file or directory
>
>   216 | #include "Python.h"
>
>   |  ^~
>
> compilation terminated.
>
> error: command 'x86_64-linux-gnu-gcc' failed with exit status 1
>
> 
>
> ERROR: Command errored out with exit status 1:
> /usr/local/google/home/ajamato/beam/build/gradleenv/-1227304282/bin/python3.8
> -u -c 'import sys, setuptools, tokenize; sys.argv[0] =
> '"'"'/tmp/pip-install-xmf_k_sy/grpcio-tools/setup.py'"'"';
> __file__='"'"'/tmp/pip-install-xmf_k_sy/grpcio-tools/setup.py'"'"';f=getattr(tokenize,
> '"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"',
> '"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))'
> install --record /tmp/pip-record-9bhhuq55/install-record.txt
> --single-version-externally-managed --compile --install-headers
> /usr/local/google/home/ajamato/beam/build/gradleenv/-1227304282/include/site/python3.8/grpcio-tools
> Check the logs for full command output.
>
>
> *> Task :sdks:python:test-suites:tox:py38:setupVirtualenv* FAILED
>


Re: Composable DoFn IOs Connection Reuse

2020-06-26 Thread Tyson Hamilton
Nice doc by the way, it's concise. Thanks for sharing and I'm excited to
see this feature, particularly the PCollection variant that would have
been useful for the Cloud AI transforms recently introduced.

On Fri, Jun 26, 2020 at 3:25 PM Tyson Hamilton  wrote:

>
>
> On Fri, Jun 26, 2020 at 12:24 PM Siyuan Chen  wrote:
>
>> Thanks Luke!
>>
>> Hi, I'm Siyuan and I'm working on the Google Dataflow team. We are faced
>> with similar issues with some sinks commonly used by Dataflow such as the
>> streaming BigQuery sink. Basically the elements are grouped and batched so
>> some following operations (potentially expensive) can be performed at once
>> on a batch of elements. One problem with the grouping is that it could
>> impose a limit on the parallelism of the DoFn performing those operations.
>> To mitigate the limited parallelism problem, recently I have been looking
>> into the idea of improving the `GroupIntoBatches` transform to allow the
>> grouping to be dynamically sharded and therefore distributed - essentially
>> a "shardable" stateful DoFn. The transform already does grouping and
>> batching on the input KV - grouping on K and batching on V - and it could
>> be extended to be able to shard the key and do batching within each shard
>> (meaning that we would have a sharded form of keys somewhere). The idea is
>> detailed in https://s.apache.org/sharded-group-into-batches
>>
>> Along with the proposal, there are two points I would like to ask for
>> advice:
>> - Would there be cases where the sharded keys need to be visible to
>> users? One case where that might be needed would be to apply another
>> stateful DoFn to the sharded output of the GroupIntoBatches, so the
>> semantics of key-to-user state mapping is respected.
>>
>
> Does exposing an API for the sharded keys change the implementation of the
> feature? If it is only an API change I think it would be best to avoid
> exposing the keys to start with to avoid any unnecessary dependency on the
> implementation. It seems like it could make it more difficult to modify the
> sharding implementation in the future unnecessarily at this point.
>
>
>> - Would there be a need to have a per element shard id or per bundle
>> shard id would just be sufficient? The former is more general and we could
>> still have the same shard id for all elements in a bundle. But the
>> conclusion would potentially affect the way of implementation (like how the
>> sharding information should be passed across FnAPI for example).
>>
>>
> Are you referring to an API for a pipeline author to get the shard id? I
> thought that a bundle isn't a pipeline author abstraction but an
> implementation detail, I may be wrong in this since I'm not too familiar
> with this area of code. In the proposal it looks like the shard id isn't
> exposed, I prefer this, as I'm not sure there is any value for the user in
> having a specific 'shard id'. Is there?
>
>
>
>> I'm very new to Beam so looking forward to hearing the thoughts from the
>> community. Any comments will be appreciated :)
>> --
>> Best regards,
>> Siyuan
>>
>>
>> On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik  wrote:
>>
>>> My first idea was to use a connection pool that is shared across the
>>> entire worker across multiple bundles. The connection pool would TTL
>>> connections that have been unused. This would help a bunch up until you hit
>>> the problem where you don't want every worker connected to every resource
>>> because of sharding of the work. In this case we should really be making
>>> sure that workers that have processed the same "key" process the same "key"
>>> again without limiting the number of workers that can process a specific
>>> key. This is very similar to what we do with a stateful DoFn but one where
>>> the runner knows that it can "shard" the key. +Siyuan Chen
>>>  has been investigating something like this for
>>> Dataflow to solve scalability issues with the BigQuery sink and has been
>>> looking into how a better GroupIntoBatches and/or sharded stateful DoFn
>>> could really help in these situations. This applies in general to lots of
>>> things where we want to co-locate things with the same key but not limit
>>> the parallel processing to only a single worker like stateful DoFn does
>>> today.
>>>
>>> On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía  wrote:
>>>
 We have been promoting the use of DoFn to write IO connectors for many
 reasons
 including better composability. A common pattern that arrives in such
 IOs is
 that a preceding transform prepares the specification element on split
 that a
 subsequent DoFn uses to read the data. You can see an example of this
 on FileIO
 [1] or in RedisIO [2]

 The issue is that if we process that spec in the `@ProcessElement`
 method we
 lose the DoFn lifecycle because we cannot establish a connection on
 `@Setup` and
 close it in `@Teardown` because the spec is per element, so we end up re
 creating

Re: Composable DoFn IOs Connection Reuse

2020-06-26 Thread Tyson Hamilton
On Fri, Jun 26, 2020 at 12:24 PM Siyuan Chen  wrote:

> Thanks Luke!
>
> Hi, I'm Siyuan and I'm working on the Google Dataflow team. We are faced
> with similar issues with some sinks commonly used by Dataflow such as the
> streaming BigQuery sink. Basically the elements are grouped and batched so
> some following operations (potentially expensive) can be performed at once
> on a batch of elements. One problem with the grouping is that it could
> impose a limit on the parallelism of the DoFn performing those operations.
> To mitigate the limited parallelism problem, recently I have been looking
> into the idea of improving the `GroupIntoBatches` transform to allow the
> grouping to be dynamically sharded and therefore distributed - essentially
> a "shardable" stateful DoFn. The transform already does grouping and
> batching on the input KV - grouping on K and batching on V - and it could
> be extended to be able to shard the key and do batching within each shard
> (meaning that we would have a sharded form of keys somewhere). The idea is
> detailed in https://s.apache.org/sharded-group-into-batches
>
> Along with the proposal, there are two points I would like to ask for
> advice:
> - Would there be cases where the sharded keys need to be visible to users?
> One case where that might be needed would be to apply another stateful DoFn
> to the sharded output of the GroupIntoBatches, so the semantics of
> key-to-user state mapping is respected.
>

Does exposing an API for the sharded keys change the implementation of the
feature? If it is only an API change I think it would be best to avoid
exposing the keys to start with to avoid any unnecessary dependency on the
implementation. It seems like it could make it more difficult to modify the
sharding implementation in the future unnecessarily at this point.


> - Would there be a need to have a per element shard id or per bundle shard
> id would just be sufficient? The former is more general and we could still
> have the same shard id for all elements in a bundle. But the conclusion
> would potentially affect the way of implementation (like how the sharding
> information should be passed across FnAPI for example).
>
>
Are you referring to an API for a pipeline author to get the shard id? I
thought that a bundle isn't a pipeline author abstraction but an
implementation detail, I may be wrong in this since I'm not too familiar
with this area of code. In the proposal it looks like the shard id isn't
exposed, I prefer this, as I'm not sure there is any value for the user in
having a specific 'shard id'. Is there?



> I'm very new to Beam so looking forward to hearing the thoughts from the
> community. Any comments will be appreciated :)
> --
> Best regards,
> Siyuan
>
>
> On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik  wrote:
>
>> My first idea was to use a connection pool that is shared across the
>> entire worker across multiple bundles. The connection pool would TTL
>> connections that have been unused. This would help a bunch up until you hit
>> the problem where you don't want every worker connected to every resource
>> because of sharding of the work. In this case we should really be making
>> sure that workers that have processed the same "key" process the same "key"
>> again without limiting the number of workers that can process a specific
>> key. This is very similar to what we do with a stateful DoFn but one where
>> the runner knows that it can "shard" the key. +Siyuan Chen
>>  has been investigating something like this for
>> Dataflow to solve scalability issues with the BigQuery sink and has been
>> looking into how a better GroupIntoBatches and/or sharded stateful DoFn
>> could really help in these situations. This applies in general to lots of
>> things where we want to co-locate things with the same key but not limit
>> the parallel processing to only a single worker like stateful DoFn does
>> today.
>>
>> On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía  wrote:
>>
>>> We have been promoting the use of DoFn to write IO connectors for many
>>> reasons
>>> including better composability. A common pattern that arrives in such
>>> IOs is
>>> that a preceding transform prepares the specification element on split
>>> that a
>>> subsequent DoFn uses to read the data. You can see an example of this on
>>> FileIO
>>> [1] or in RedisIO [2]
>>>
>>> The issue is that if we process that spec in the `@ProcessElement`
>>> method we
>>> lose the DoFn lifecycle because we cannot establish a connection on
>>> `@Setup` and
>>> close it in `@Teardown` because the spec is per element, so we end up re
>>> creating connections which is a quite costly operation in some systems
>>> like
>>> Cassandra/HBase/etc and that it could end up saturating the data store
>>> because
>>> of the massive creation of connections (something that already happened
>>> in the
>>> past with JdbcIO in the streaming case).
>>>
>>> In the ongoing PR that transforms Cassandra to be DoFn based [3] this
>>>

Commands to detect style issues quickly before sending PR

2020-06-26 Thread Alex Amato
I sent out some PRs a few days ago, and quickly discovered a bunch of
errors and have been spending most of my time playing wack-a-mole without
knowing how to repro them all locally.

I asked this a few years ago, and wanted to make sure I have something up
to date to work with. Ideally, I'd like a single command line for
simplicity. Here is what I've been using. I'm not sure if we have a script
or gradle target which already covers this or not

*Java*
time ./gradlew spotlessApply && ./gradlew checkstyleMain checkstyleTest
javadoc spotbugsMain compileJava compileTestJava

*Python *
./gradlew  :sdks:python:test-suites:tox:py2:lintPy27_3 && ./gradlew
:sdks:python:test-suites:tox:py37:lintPy37
&& ./gradlew :sdks:python:test-suites:tox:py38:formatter

(I think this might be correct, maybe there is a faster way to run it
directly with tox as well)


Though the python command is failing for me, perhaps I need to install
another python version. I think we have setup steps for those in the wiki...


creating
build/temp.linux-x86_64-3.8/third_party/protobuf/src/google/protobuf/util/internal

creating
build/temp.linux-x86_64-3.8/third_party/protobuf/src/google/protobuf/stubs

creating
build/temp.linux-x86_64-3.8/third_party/protobuf/src/google/protobuf/io

x86_64-linux-gnu-gcc -pthread -Wno-unused-result -Wsign-compare
-DNDEBUG -g -fwrapv -O2 -Wall -g -fstack-protector-strong -Wformat
-Werror=format-security -g -fwrapv -O2 -g -fstack-protector-strong -Wformat
-Werror=format-security -Wdate-time -D_FORTIFY_SOURCE=2 -fPIC
-DHAVE_PTHREAD=1 -I. -Igrpc_root -Igrpc_root/include
-Ithird_party/protobuf/src -I/usr/include/python3.8
-I/usr/local/google/home/ajamato/beam/build/gradleenv/-1227304282/include/python3.8
-c grpc_tools/_protoc_compiler.cpp -o
build/temp.linux-x86_64-3.8/grpc_tools/_protoc_compiler.o -std=c++11
-fno-wrapv -frtti

grpc_tools/_protoc_compiler.cpp:216:10: fatal error: Python.h: No such
file or directory

  216 | #include "Python.h"

  |  ^~

compilation terminated.

error: command 'x86_64-linux-gnu-gcc' failed with exit status 1



ERROR: Command errored out with exit status 1:
/usr/local/google/home/ajamato/beam/build/gradleenv/-1227304282/bin/python3.8
-u -c 'import sys, setuptools, tokenize; sys.argv[0] =
'"'"'/tmp/pip-install-xmf_k_sy/grpcio-tools/setup.py'"'"';
__file__='"'"'/tmp/pip-install-xmf_k_sy/grpcio-tools/setup.py'"'"';f=getattr(tokenize,
'"'"'open'"'"', open)(__file__);code=f.read().replace('"'"'\r\n'"'"',
'"'"'\n'"'"');f.close();exec(compile(code, __file__, '"'"'exec'"'"'))'
install --record /tmp/pip-record-9bhhuq55/install-record.txt
--single-version-externally-managed --compile --install-headers
/usr/local/google/home/ajamato/beam/build/gradleenv/-1227304282/include/site/python3.8/grpcio-tools
Check the logs for full command output.


*> Task :sdks:python:test-suites:tox:py38:setupVirtualenv* FAILED


Re: Problems with underlying client library types in beam?

2020-06-26 Thread Luke Cwik
There have been many versions of Guava that have been released with some
methods that have been removed/modified and many methods that were marked
@Beta. Guava maintainers said from release 23? that they would no longer
make backwards incompatible changes to non beta APIs and released a tool to
help users ensure they aren't using beta APIs [1]. This tool is great but
there are already 1000s of released artifacts that were produced before the
API lockdown and that beta checker was added.

There are a lot of libraries that are still on Guava 19 or older and users
still run into linkage errors[2]. Users regularly run a linkage checker to
help them diagnose diamond dependency issues to get a set of libraries that
work with Beam and their dependencies. For example, the Beam community saw
this issue[3] when Guava decided to overload popular Preconditions[4]
methods which lead to NoSuchMethodErrors. A google search for guava and
NoSuchMethodError/ClassNotFoundException shows many users who have hit
issues with Guava.

Google has worked on helping users via a linkage checking tool[5] and
Beam[6] would like to be in a better place with dependencies by using BOMs
for dependency management but both of these efforts are still relatively
new.

On Thu, Jun 25, 2020 at 9:28 AM Daniel Collins  wrote:

> Hello all,
>
> In https://github.com/apache/beam/pull/11919, there is a concern about
> the fact that the underlying client library's Message class uses guava's
> ListMultimap to represent a String -> Collection mapping, and
> this type is exposed by the interface. A couple of questions:
>
> 1) I'd like to use the same primitive value types as the underlying client
> library for consistency. Given that both the client library and beam io
> have the same maintainers, is this likely to cause long term issues? I'd
> really like to avoid wrapping the client library types in an
> almost-identical-but-not-quite wrapper just to remove guava, and I don't
> think removing guava from the underlying type is reasonable.
>

For designing methods within Beam, it is preferable that you don't make
guava or the underlying client library part of the API surface. If there
are simple substitutes for types that is great but for configuration
objects such as protos it is a judgement call. Most IOs don't use the
underlying configuration object but instead choose to expose certain
properties on a case by case basis since properties may not make sense to
expose.

For return types within the third party lib, casting the type to the
closest type within Java and only using methods that are part of Java core
will reduce the risk that you will invoke a method that has been
changed/removed. For example
ImmutableList getFoos();  // method on object you want to interact
with
ImmutableList foo = getFoos(); // bad, you can invoke a method that
has been removed/modified.
List foo = getFoos();  // good, doesn't depend on Guava exposed APIs

For method arguments in the third party library you don't really have much
of a choice since you will have to call an API to construct the object and
you'll need to validate that you are not using @Beta apis. You can only try
to minimize the API surface you interact with.


> 2) What is the blast radius of guava incompatibility issues? There is a
> presubmit that blocks beam from using unshaded guava, presumably because it
> has caused issues in the past. If I expose this type, and someone tries to
> use it with an incompatible guava version, will they fail if they try to
> use beam or try to use this IO specifically?
>

There are two issues:
1) Users need to solve the diamond dependency problem and provide a version
of Guava that works with Beam and their dependencies. Guava usage within
Beam and its transitive dependency tree makes that more difficult for users
since some users rely on the linkage checker to help figure out a
compatible set of dependencies. This means that the linkage checker will
find faults that may have not existed otherwise.
2) If the user selects a non-compatible version of guava, then it depends:
* any static links between classes will poison all referenced classes if
one of them does something that causes a linkage error (e.g. static block
that calls a method that can't be found or a class that doesn't exist)
* if the code is loaded dynamically via service loaders/reflection, then
the entire application can fail as soon as the object is on the classpath


>
> Thanks! Looking forward to some feedback.
>

In the end, it is a judgement call vs maintenance, ease of use for users,
and user support.

1: https://github.com/google/guava-beta-checker
2: https://jlbp.dev/glossary.html#linkage-error 
3: https://issues.apache.org/jira/browse/BEAM-1411
4:
https://github.com/google/guava/commit/892e323fca32945cdfb25395ca6e346dd0fffa5b#diff-fe7358934fa6eba23c2791eb40cec030
5:
https://github.com/GoogleCloudPlatform/cloud-opensource-java/wiki/Linkage-Checker-Enforcer-Rule
6: https://issu

Individual Parallelism support for Flink Runner

2020-06-26 Thread Akshay Iyangar
Hi beam community,

So I had brought this issue in our slack channel but I guess this warrants a 
deeper discussion and if we do go about what is the POA for it.

So basically currently for Flink Runner we don’t support operator level 
parallelism which native Flink provides OOTB. So I was wondering what the 
community feels about having some way to pass parallelism for individual 
operators esp.  for some of the existing IO’s

Wanted to know what people think of this.

Thanks
Akshay I


FYI, Python precommits and formatter checks fail on Py3.8 at the moment.

2020-06-26 Thread Valentyn Tymofieiev
https://issues.apache.org/jira/browse/BEAM-10333


Re: Composable DoFn IOs Connection Reuse

2020-06-26 Thread Siyuan Chen
Thanks Luke!

Hi, I'm Siyuan and I'm working on the Google Dataflow team. We are faced
with similar issues with some sinks commonly used by Dataflow such as the
streaming BigQuery sink. Basically the elements are grouped and batched so
some following operations (potentially expensive) can be performed at once
on a batch of elements. One problem with the grouping is that it could
impose a limit on the parallelism of the DoFn performing those operations.
To mitigate the limited parallelism problem, recently I have been looking
into the idea of improving the `GroupIntoBatches` transform to allow the
grouping to be dynamically sharded and therefore distributed - essentially
a "shardable" stateful DoFn. The transform already does grouping and
batching on the input KV - grouping on K and batching on V - and it could
be extended to be able to shard the key and do batching within each shard
(meaning that we would have a sharded form of keys somewhere). The idea is
detailed in https://s.apache.org/sharded-group-into-batches

Along with the proposal, there are two points I would like to ask for
advice:
- Would there be cases where the sharded keys need to be visible to users?
One case where that might be needed would be to apply another stateful DoFn
to the sharded output of the GroupIntoBatches, so the semantics of
key-to-user state mapping is respected.
- Would there be a need to have a per element shard id or per bundle shard
id would just be sufficient? The former is more general and we could still
have the same shard id for all elements in a bundle. But the conclusion
would potentially affect the way of implementation (like how the sharding
information should be passed across FnAPI for example).

I'm very new to Beam so looking forward to hearing the thoughts from the
community. Any comments will be appreciated :)
--
Best regards,
Siyuan


On Tue, Jun 16, 2020 at 3:04 PM Luke Cwik  wrote:

> My first idea was to use a connection pool that is shared across the
> entire worker across multiple bundles. The connection pool would TTL
> connections that have been unused. This would help a bunch up until you hit
> the problem where you don't want every worker connected to every resource
> because of sharding of the work. In this case we should really be making
> sure that workers that have processed the same "key" process the same "key"
> again without limiting the number of workers that can process a specific
> key. This is very similar to what we do with a stateful DoFn but one where
> the runner knows that it can "shard" the key. +Siyuan Chen
>  has been investigating something like this for
> Dataflow to solve scalability issues with the BigQuery sink and has been
> looking into how a better GroupIntoBatches and/or sharded stateful DoFn
> could really help in these situations. This applies in general to lots of
> things where we want to co-locate things with the same key but not limit
> the parallel processing to only a single worker like stateful DoFn does
> today.
>
> On Tue, Jun 16, 2020 at 2:44 PM Ismaël Mejía  wrote:
>
>> We have been promoting the use of DoFn to write IO connectors for many
>> reasons
>> including better composability. A common pattern that arrives in such IOs
>> is
>> that a preceding transform prepares the specification element on split
>> that a
>> subsequent DoFn uses to read the data. You can see an example of this on
>> FileIO
>> [1] or in RedisIO [2]
>>
>> The issue is that if we process that spec in the `@ProcessElement` method
>> we
>> lose the DoFn lifecycle because we cannot establish a connection on
>> `@Setup` and
>> close it in `@Teardown` because the spec is per element, so we end up re
>> creating connections which is a quite costly operation in some systems
>> like
>> Cassandra/HBase/etc and that it could end up saturating the data store
>> because
>> of the massive creation of connections (something that already happened
>> in the
>> past with JdbcIO in the streaming case).
>>
>> In the ongoing PR that transforms Cassandra to be DoFn based [3] this
>> subject
>> appeared again, and we were discussing how to eventually reuse
>> connections,
>> maybe by a pretty naive approach of saving a previous connection (or set
>> of
>> identified connections) statically so it can be reused by multiple DoFns
>> instances. We already had some issues in the past because of creating many
>> connections on other IOs (JdbcIO) with streaming pipelines where
>> databases were
>> swamped by massive amounts of connections, so reusing connections seems
>> to be
>> something that matters, but at the moment we do not have a clear way to
>> do this
>> better.
>>
>> Anyone have better ideas or recommendations for this scenario?
>> Thanks in advance.
>>
>> Ismaël
>>
>> [1]
>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13ca77515bd24abc255eda/sdks/java/io/parquet/src/main/java/org/apache/beam/sdk/io/parquet/ParquetIO.java#L260
>> [2]
>> https://github.com/apache/beam/blob/14085a5a3c0e146fcc13c

Re: [DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-26 Thread Chamikara Jayalath
On Fri, Jun 26, 2020 at 11:49 AM Luke Cwik  wrote:

> I would also like to suggest that transforms that implement ReadAll via
> Read should also provide methods like:
>
> // Uses the specified values if unspecified in the input element from the
> PCollection.
> withDefaults(Read read);
> // Uses the specified values regardless of what the input element from the
> PCollection specifies.
> withOverrides(Read read);
>
> and only adds methods that are required at construction time (e.g.
> coders). This way the majority of documentation sits on the Read transform.
>

+0 from me. Sounds like benefits outweigh the drawbacks here and some of
the drawbacks related to cross-language can be overcome through future
advancements.
Thanks for bringing this up Ismaël.

- Cham


>
> On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik  wrote:
>
>> Ismael, it is good to hear that using Read as the input didn't have a
>> bunch of parameters that were being skipped/ignored. Also, for the
>> polymorphism issue you have to rely on the user correctly telling you the
>> type in such a way where it is a common ancestor of all the runtime types
>> that will ever be used. This usually boils down to something like
>> Serializable or DynamicMessage such that the coder that is chosen works for
>> all the runtime types. Using multiple types is a valid use case and would
>> allow for a simpler graph with less flattens merging the output from
>> multiple sources.
>>
>> Boyuan, as you have mentioned we can have a coder for KafkaIO.Read which
>> uses schemas even if some of the parameters can't be represented in a
>> meaningful way beyond "bytes". This would be helpful for cross language as
>> well since every parameter would become available if a language could
>> support it (e.g. it could serialize a java function up front and keep it
>> saved as raw bytes within said language). Even if we figure out a better
>> way to do this in the future, we'll have to change the schema for the new
>> way anyway. This would mean that the external version of the transform
>> adopts Row to Read and we drop KafkaSourceDescriptor. The conversion from
>> Row to Read could validate that the parameters make sense (e.g. the bytes
>> are valid serialized functions). The addition of an
>> endReadTime/endReadOffset would make sense for KafkaIO.Read as well and
>> this would enable having a bounded version that could be used for backfills
>> (this doesn't have to be done as part of any current ongoing PR).
>> Essentially any parameter that could be added for a single instance of a
>> Kafka element+restriction would also make sense to the KafkaIO.Read
>> transform since it too is a single instance. There are parameters that
>> would apply to the ReadAll that wouldn't apply to a read and these would be
>> global parameters across all element+restriction pairs such as config
>> overrides or default values.
>>
>> I am convinced that we should do as Ismael is suggesting and use
>> KafkaIO.Read as the type.
>>
>>
>> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath 
>> wrote:
>>
>>> Discussion regarding cross-language transforms is a slight tangent here.
>>> But I think, in general, it's great if we can use existing transforms (for
>>> example, IO connectors) as cross-language transforms without having to
>>> build more composites (irrespective of whether in ExternalTransformBuilders
>>> or a user pipelines) just to make them cross-language compatible. A future
>>> cross-language compatible SchemaCoder might help (assuming that works for
>>> Read transform) but I'm not sure we have a good idea when we'll get to that
>>> state.
>>>
>>> Thanks,
>>> Cham
>>>
>>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang  wrote:
>>>
 For unbounded SDF in Kafka, we also consider the upgrading/downgrading
 compatibility in the pipeline update scenario(For detailed discussion,
 please refer to
 https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E).
 In order to obtain the compatibility, it requires the input of the read SDF
 is schema-aware.

 Thus the major constraint of mapping KafkaSourceDescriptor to
 PCollection is, the KafkaIO.Read also needs to be schema-aware,
 otherwise pipeline updates might fail unnecessarily. If looking into
 KafkaIO.Read, not all necessary fields are compatible with schema, for
 example, SerializedFunction.

 I'm kind of confused by why ReadAll is a common pattern
 for SDF based IO. The Read can be a common pattern because the input is
 always a PBegin. But for an SDF based IO, the input can be anything. By
 using Read as input, we will still have the maintenance cost when SDF IO
 supports a new field but Read doesn't consume it. For example, we are
 discussing adding endOffset and endReadTime to KafkaSourceDescriptior,
 which is not used in KafkaIO.Read.

 On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía  wrote:

>

Re: [DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-26 Thread Luke Cwik
I would also like to suggest that transforms that implement ReadAll via
Read should also provide methods like:

// Uses the specified values if unspecified in the input element from the
PCollection.
withDefaults(Read read);
// Uses the specified values regardless of what the input element from the
PCollection specifies.
withOverrides(Read read);

and only adds methods that are required at construction time (e.g. coders).
This way the majority of documentation sits on the Read transform.

On Fri, Jun 26, 2020 at 9:58 AM Luke Cwik  wrote:

> Ismael, it is good to hear that using Read as the input didn't have a
> bunch of parameters that were being skipped/ignored. Also, for the
> polymorphism issue you have to rely on the user correctly telling you the
> type in such a way where it is a common ancestor of all the runtime types
> that will ever be used. This usually boils down to something like
> Serializable or DynamicMessage such that the coder that is chosen works for
> all the runtime types. Using multiple types is a valid use case and would
> allow for a simpler graph with less flattens merging the output from
> multiple sources.
>
> Boyuan, as you have mentioned we can have a coder for KafkaIO.Read which
> uses schemas even if some of the parameters can't be represented in a
> meaningful way beyond "bytes". This would be helpful for cross language as
> well since every parameter would become available if a language could
> support it (e.g. it could serialize a java function up front and keep it
> saved as raw bytes within said language). Even if we figure out a better
> way to do this in the future, we'll have to change the schema for the new
> way anyway. This would mean that the external version of the transform
> adopts Row to Read and we drop KafkaSourceDescriptor. The conversion from
> Row to Read could validate that the parameters make sense (e.g. the bytes
> are valid serialized functions). The addition of an
> endReadTime/endReadOffset would make sense for KafkaIO.Read as well and
> this would enable having a bounded version that could be used for backfills
> (this doesn't have to be done as part of any current ongoing PR).
> Essentially any parameter that could be added for a single instance of a
> Kafka element+restriction would also make sense to the KafkaIO.Read
> transform since it too is a single instance. There are parameters that
> would apply to the ReadAll that wouldn't apply to a read and these would be
> global parameters across all element+restriction pairs such as config
> overrides or default values.
>
> I am convinced that we should do as Ismael is suggesting and use
> KafkaIO.Read as the type.
>
>
> On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath 
> wrote:
>
>> Discussion regarding cross-language transforms is a slight tangent here.
>> But I think, in general, it's great if we can use existing transforms (for
>> example, IO connectors) as cross-language transforms without having to
>> build more composites (irrespective of whether in ExternalTransformBuilders
>> or a user pipelines) just to make them cross-language compatible. A future
>> cross-language compatible SchemaCoder might help (assuming that works for
>> Read transform) but I'm not sure we have a good idea when we'll get to that
>> state.
>>
>> Thanks,
>> Cham
>>
>> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang  wrote:
>>
>>> For unbounded SDF in Kafka, we also consider the upgrading/downgrading
>>> compatibility in the pipeline update scenario(For detailed discussion,
>>> please refer to
>>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E).
>>> In order to obtain the compatibility, it requires the input of the read SDF
>>> is schema-aware.
>>>
>>> Thus the major constraint of mapping KafkaSourceDescriptor to
>>> PCollection is, the KafkaIO.Read also needs to be schema-aware,
>>> otherwise pipeline updates might fail unnecessarily. If looking into
>>> KafkaIO.Read, not all necessary fields are compatible with schema, for
>>> example, SerializedFunction.
>>>
>>> I'm kind of confused by why ReadAll is a common pattern
>>> for SDF based IO. The Read can be a common pattern because the input is
>>> always a PBegin. But for an SDF based IO, the input can be anything. By
>>> using Read as input, we will still have the maintenance cost when SDF IO
>>> supports a new field but Read doesn't consume it. For example, we are
>>> discussing adding endOffset and endReadTime to KafkaSourceDescriptior,
>>> which is not used in KafkaIO.Read.
>>>
>>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía  wrote:
>>>
 We forgot to mention (5) External.Config used in cross-lang, see KafkaIO
 ExternalTransformBuilder. This approach is the predecessor of (4) and
 probably a
 really good candidate to be replaced by the Row based Configuration
 Boyuan is
 envisioning (so good to be aware of this).

 Thanks for the clear explanation Luke you ment

Re: [DISCUSS] ReadAll pattern and consistent use in IO connectors

2020-06-26 Thread Luke Cwik
Ismael, it is good to hear that using Read as the input didn't have a bunch
of parameters that were being skipped/ignored. Also, for the polymorphism
issue you have to rely on the user correctly telling you the type in such a
way where it is a common ancestor of all the runtime types that will
ever be used. This usually boils down to something like Serializable or
DynamicMessage such that the coder that is chosen works for all the runtime
types. Using multiple types is a valid use case and would allow for a
simpler graph with less flattens merging the output from multiple sources.

Boyuan, as you have mentioned we can have a coder for KafkaIO.Read which
uses schemas even if some of the parameters can't be represented in a
meaningful way beyond "bytes". This would be helpful for cross language as
well since every parameter would become available if a language could
support it (e.g. it could serialize a java function up front and keep it
saved as raw bytes within said language). Even if we figure out a better
way to do this in the future, we'll have to change the schema for the new
way anyway. This would mean that the external version of the transform
adopts Row to Read and we drop KafkaSourceDescriptor. The conversion from
Row to Read could validate that the parameters make sense (e.g. the bytes
are valid serialized functions). The addition of an
endReadTime/endReadOffset would make sense for KafkaIO.Read as well and
this would enable having a bounded version that could be used for backfills
(this doesn't have to be done as part of any current ongoing PR).
Essentially any parameter that could be added for a single instance of a
Kafka element+restriction would also make sense to the KafkaIO.Read
transform since it too is a single instance. There are parameters that
would apply to the ReadAll that wouldn't apply to a read and these would be
global parameters across all element+restriction pairs such as config
overrides or default values.

I am convinced that we should do as Ismael is suggesting and use
KafkaIO.Read as the type.


On Thu, Jun 25, 2020 at 6:00 PM Chamikara Jayalath 
wrote:

> Discussion regarding cross-language transforms is a slight tangent here.
> But I think, in general, it's great if we can use existing transforms (for
> example, IO connectors) as cross-language transforms without having to
> build more composites (irrespective of whether in ExternalTransformBuilders
> or a user pipelines) just to make them cross-language compatible. A future
> cross-language compatible SchemaCoder might help (assuming that works for
> Read transform) but I'm not sure we have a good idea when we'll get to that
> state.
>
> Thanks,
> Cham
>
> On Thu, Jun 25, 2020 at 3:13 PM Boyuan Zhang  wrote:
>
>> For unbounded SDF in Kafka, we also consider the upgrading/downgrading
>> compatibility in the pipeline update scenario(For detailed discussion,
>> please refer to
>> https://lists.apache.org/thread.html/raf073b8741317244339eb5b2bce844c0f9e0d700c3e4de392fc648d6%40%3Cdev.beam.apache.org%3E).
>> In order to obtain the compatibility, it requires the input of the read SDF
>> is schema-aware.
>>
>> Thus the major constraint of mapping KafkaSourceDescriptor to
>> PCollection is, the KafkaIO.Read also needs to be schema-aware,
>> otherwise pipeline updates might fail unnecessarily. If looking into
>> KafkaIO.Read, not all necessary fields are compatible with schema, for
>> example, SerializedFunction.
>>
>> I'm kind of confused by why ReadAll is a common pattern
>> for SDF based IO. The Read can be a common pattern because the input is
>> always a PBegin. But for an SDF based IO, the input can be anything. By
>> using Read as input, we will still have the maintenance cost when SDF IO
>> supports a new field but Read doesn't consume it. For example, we are
>> discussing adding endOffset and endReadTime to KafkaSourceDescriptior,
>> which is not used in KafkaIO.Read.
>>
>> On Thu, Jun 25, 2020 at 2:19 PM Ismaël Mejía  wrote:
>>
>>> We forgot to mention (5) External.Config used in cross-lang, see KafkaIO
>>> ExternalTransformBuilder. This approach is the predecessor of (4) and
>>> probably a
>>> really good candidate to be replaced by the Row based Configuration
>>> Boyuan is
>>> envisioning (so good to be aware of this).
>>>
>>> Thanks for the clear explanation Luke you mention the real issue(s). All
>>> the
>>> approaches discussed so far in the end could be easily transformed to
>>> produce a
>>> PCollection and those Read Elements could be read by the generic
>>> ReadAll
>>> transform. Notice that this can be internal in some IOs e.g. KafkaIO if
>>> they
>>> decide not to expose it. I am not saying that we should force every IO to
>>> support ReadAll in its public API but if we do it is probably a good
>>> idea to be
>>> consistent with naming the transform that expects an input
>>> PCollection in
>>> the same way. Also notice that using it will save us of the maintenance
>>> issues
>>> discussed in my previous email.

Python precommits fail due to compilation of pandas with cython

2020-06-26 Thread Mikhail Gryzykhin
Hi all,

Multiple python precommit jobs are failing due to cython failing to compile
pandas ([BEAM-10333] )
currently. I tried to debug this, but no success. Can someone help take a
look?

Thank you,
Mikhail.