Re: Checkpoints timing out upgrading from Beam version 2.29 with Flink 1.12 to Beam 2.38 and Flink 1.14

2022-07-26 Thread Kenneth Knowles
Bumping this and adding +John Casey  who knows about
KafkaIO and unbounded sources, though probably less about the FlinkRunner.
It seems you have isolated it to the Flink translation logic. I'm not sure
who would be the best expert to evaluate if that logic is still OK.

Kenn

On Wed, Jun 29, 2022 at 11:07 AM Kathula, Sandeep <
sandeep_kath...@intuit.com> wrote:

> Hi,
>
>We have a stateless application which
>
>
>
>1. Reads from kafka
>2. Doing some stateless transformations by reading from in memory
>databases and updating the records
>3. Writing back to Kafka.
>
>
>
>
>
>
>
> *With Beam 2.23 and Flink 1.9, we are seeing checkpoints are working fine
> (it takes max 1 min).*
>
>
>
> *With Beam 2.29 and Flink 1.12, we are seeing checkpoints taking longer
> time (it takes max 6-7 min sometimes)*
>
>
>
> *With Beam 2.38 and Flink 1.14, we are seeing checkpoints timing out after
> 10 minutes.*
>
>
>
>
>
> I am checking Beam code and after some logging and analysis found the
> problem is at
> https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L287-L307
>
>
>
>
>
> We are still using the old API to read from Kafka and not yet using KafkaIO
> based on SplittableDoFn.
>
>
>
> There are two threads
>
>1. Legacy source thread reading from kafka and doing entire processing.
>2. Thread which emits watermark on timer
>
> https://github.com/apache/beam/blob/release-2.38.0/runners/flink/src/main/java/org/apache/beam/runners/flink/translation/wrappers/streaming/io/UnboundedSourceWrapper.java#L454-L474
>
>
>
> Both these code blocks are in synchronized block waiting for same
> checkpoint lock. Under heavy load, the thread reading from kafka is running
> for ever in the while loop and  the thread emitting the watermarks is
> waiting for ever to get the lock not emitting the watermarks and the
> checkpoint times out.
>
>
>
>
>
> Is it a known issue and do we have any solution here? For now we are
> putting Thread.sleep(1) once for every 10 sec after the synchronized block
> so that the thread emitting the watermarks can be unblocked and run.
>
>
>
> One of my colleagues tried to follow up on this (attaching the previous
> email here) but we didn’t get any reply. Any help on this would be
> appreciated.
>
>
>
> Thanks,
>
> Sandeep
>


Re: [CDAP IO] SparkReceiverIO integration testing

2022-07-26 Thread Chamikara Jayalath via dev
On Tue, Jul 26, 2022 at 11:24 AM Elizaveta Lomteva <
elizaveta.lomt...@akvelon.com> wrote:

> Hi, community!
> Our team has prepared SparkReceiverIO Read via SDF PR [1]. We have started
> working on integration tests for the SparkReceiverIO connector which will
> allow to read data from Custom Spark Receivers in Apache Beam pipeline.
>
> A general Apache Beam recommendation is to implement “ write then read”
> style integration tests. But in our case, only the Read interface was
> implemented because Spark Receivers couldn't be used for the write.
>
> Since SparkReceiverIO is an abstract IO working with Spark Receivers,
> there is no exact implementation for a particular source. Therefore, we
> think to choose RabbitMQ as a test source for the following reasons:
>
>- It’s possible to implement a Custom Spark Receiver on RabbitMQ as a
>test streaming receiver
>- RabbitMQ is lightweight and easy to deploy
>- There is a test container for RabbitMQ
>- It’s possible to generate as much test input to the RabbitMQ as we
>need
>- Apache Beam has a RabbitMQ IO [2]  that could hypothetically be used
>in the “write” step of the test
>
> Cons of this choice are:
>
>- We would need a RabbitMQ test container and additional Kubernetes
>configuration in ./test-infra
>- The RabbitMQ peak throughput is less compared with Kafka, for
>example [3]
>
>
> Based on this, two questions arise:
>
>1.
>
>Are there any restrictions when choosing a test source? Can we use
>RabbitMQ in our case?
>
>
I think the main requirement is that we want to test SparkReceiverIO in a
way that is similar to the way it would be used by actual end-users. So if
RabbitMQ-based receiver is a good representative for a typical Spark
Receiver , this should be fine.



>
>1.
>2.
>
>If RabbitMQ is suitable for our purposes, can we use the RabbitMQ IO
>to write data in the integration test “write” step or should we use
>RabbitMQ API directly without adding a dependency on Apache Beam RabbitMQ
>IO?
>
>

I would use RabbitMQIO and implement a write-then-read type test assuming
we can develop a non-flaky test that uses both connectors. If you run into
flakes I think just developing a test for the source is fine.

Thanks,
Cham


>
>1.
>
>
> Any ideas or comments would be greatly appreciated!
>
> Thank you in advance,
>
> Elizaveta
>
> [1] [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828 –
> https://github.com/apache/beam/pull/17828
>
> [2] Apache Beam RabbitMQ IO –
> https://github.com/apache/beam/tree/master/sdks/java/io/rabbitmq
> [3] Benchmarking Apache Kafka, RabbitMQ article (2020 year) –
> https://www.confluent.io/blog/kafka-fastest-messaging-system/
>
>
>
>


Re: BigTable reader for Python?

2022-07-26 Thread Sachin Agarwal via dev
On Tue, Jul 26, 2022 at 6:12 PM Chamikara Jayalath via dev <
dev@beam.apache.org> wrote:

>
>
> On Mon, Jul 25, 2022 at 12:53 PM Lina Mårtensson via dev <
> dev@beam.apache.org> wrote:
>
>> Hi dev,
>>
>> We're starting to incorporate BigTable in our stack and I've delighted
>> my co-workers with how easy it was to create some BigTables with
>> Beam... but there doesn't appear to be a reader for BigTable in
>> Python.
>>
>> First off, is there a good reason why not/any reason why it would be
>> difficult?
>>
>
> There's was a previous effort to implement a Python BT source but that was
> not completed:
> https://github.com/apache/beam/pull/11295#issuecomment-646378304
>
>
>>
>> I could write one, but before I start, I'd love some input to make it
>> easier.
>>
>> It appears that there would be two options: either write one in
>> Python, or try to set one up with x-language from Java which I see is
>> done e.g. with the Spanner IO Connector.
>> Any recommendation on which one to pick or potential pitfalls in either
>> choice?
>>
>> If I write one in Python, what should I think about?
>> It is not obvious to me how to achieve parallelization, so any tips
>> here would be welcome.
>>
>
> I would strongly prefer developing a  Python wrapper for the existing Java
> BT source using Beam's Multi-language Pipelines framework over developing a
> new Python source.
>
> https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines
>

This is the way.

>
> 
>
> Thanks,
> Cham
>
>
>
>>
>> Thanks!
>> -Lina
>>
>


Re: BigTable reader for Python?

2022-07-26 Thread Chamikara Jayalath via dev
On Mon, Jul 25, 2022 at 12:53 PM Lina Mårtensson via dev <
dev@beam.apache.org> wrote:

> Hi dev,
>
> We're starting to incorporate BigTable in our stack and I've delighted
> my co-workers with how easy it was to create some BigTables with
> Beam... but there doesn't appear to be a reader for BigTable in
> Python.
>
> First off, is there a good reason why not/any reason why it would be
> difficult?
>

There's was a previous effort to implement a Python BT source but that was
not completed:
https://github.com/apache/beam/pull/11295#issuecomment-646378304


>
> I could write one, but before I start, I'd love some input to make it
> easier.
>
> It appears that there would be two options: either write one in
> Python, or try to set one up with x-language from Java which I see is
> done e.g. with the Spanner IO Connector.
> Any recommendation on which one to pick or potential pitfalls in either
> choice?
>
> If I write one in Python, what should I think about?
> It is not obvious to me how to achieve parallelization, so any tips
> here would be welcome.
>

I would strongly prefer developing a  Python wrapper for the existing Java
BT source using Beam's Multi-language Pipelines framework over developing a
new Python source.
https://beam.apache.org/documentation/programming-guide/#multi-language-pipelines

Thanks,
Cham



>
> Thanks!
> -Lina
>


[CDAP IO] SparkReceiverIO integration testing

2022-07-26 Thread Elizaveta Lomteva
Hi, community!
Our team has prepared SparkReceiverIO Read via SDF PR [1]. We have started 
working on integration tests for the SparkReceiverIO connector which will allow 
to read data from Custom Spark Receivers in Apache Beam pipeline.

A general Apache Beam recommendation is to implement “ write then read” style 
integration tests. But in our case, only the Read interface was implemented 
because Spark Receivers couldn't be used for the write.

Since SparkReceiverIO is an abstract IO working with Spark Receivers, there is 
no exact implementation for a particular source. Therefore, we think to choose 
RabbitMQ as a test source for the following reasons:

  *   It’s possible to implement a Custom Spark Receiver on RabbitMQ as a test 
streaming receiver
  *   RabbitMQ is lightweight and easy to deploy
  *   There is a test container for RabbitMQ
  *   It’s possible to generate as much test input to the RabbitMQ as we need
  *   Apache Beam has a RabbitMQ IO [2]  that could hypothetically be used in 
the “write” step of the test

Cons of this choice are:

  *   We would need a RabbitMQ test container and additional Kubernetes 
configuration in ./test-infra
  *   The RabbitMQ peak throughput is less compared with Kafka, for example [3]


Based on this, two questions arise:

  1.  Are there any restrictions when choosing a test source? Can we use 
RabbitMQ in our case?

  2.  If RabbitMQ is suitable for our purposes, can we use the RabbitMQ IO to 
write data in the integration test “write” step or should we use RabbitMQ API 
directly without adding a dependency on Apache Beam RabbitMQ IO?


Any ideas or comments would be greatly appreciated!


Thank you in advance,

Elizaveta


[1] [BEAM-14378] [CdapIO] SparkReceiverIO Read via SDF #17828 – 
https://github.com/apache/beam/pull/17828

[2] Apache Beam RabbitMQ IO – 
https://github.com/apache/beam/tree/master/sdks/java/io/rabbitmq

[3] Benchmarking Apache Kafka, RabbitMQ article (2020 year) – 
https://www.confluent.io/blog/kafka-fastest-messaging-system/




Beam High Priority Issue Report

2022-07-26 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/22440 [Bug]: Python Batch Dataflow 
SideInput LoadTests failing
https://github.com/apache/beam/issues/22436 [Bug]: Python Streaming Combine 
load tests on Dataflow are timing out
https://github.com/apache/beam/issues/22401 [Bug]: BigQueryIO getFailedInserts 
fails when using Storage APIs 
https://github.com/apache/beam/issues/22321 
PortableRunnerTestWithExternalEnv.test_pardo_large_input is regularly failing 
on jenkins
https://github.com/apache/beam/issues/22303 [Task]: Add tests to Kafka SDF and 
fix known and discovered issues
https://github.com/apache/beam/issues/22299 [Bug]: JDBCIO Write freeze at 
getConnection() in WriteFn
https://github.com/apache/beam/issues/22188 BigQuery Storage API sink sometimes 
gets stuck outputting to an invalid timestamp
https://github.com/apache/beam/issues/21935 [Bug]: Reject illformed GBK Coders
https://github.com/apache/beam/issues/21794 Dataflow runner creates a new timer 
whenever the output timestamp is change
https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output 
to Failed Inserts PCollection
https://github.com/apache/beam/issues/21704 beam_PostCommit_Java_DataflowV2 
failures parent bug
https://github.com/apache/beam/issues/21703 pubsublite.ReadWriteIT failing in 
beam_PostCommit_Java_DataflowV1 and V2
https://github.com/apache/beam/issues/21702 SpannerWriteIT failing in beam 
PostCommit Java V1
https://github.com/apache/beam/issues/21701 beam_PostCommit_Java_DataflowV1 
failing with a variety of flakes and errors
https://github.com/apache/beam/issues/21700 
--dataflowServiceOptions=use_runner_v2 is broken
https://github.com/apache/beam/issues/21696 Flink Tests failure :  
java.lang.NoClassDefFoundError: Could not initialize class 
org.apache.beam.runners.core.construction.SerializablePipelineOptions 
https://github.com/apache/beam/issues/21695 DataflowPipelineResult does not 
raise exception for unsuccessful states.
https://github.com/apache/beam/issues/21694 BigQuery Storage API insert with 
writeResult retry and write to error table
https://github.com/apache/beam/issues/21480 flake: 
FlinkRunnerTest.testEnsureStdoutStdErrIsRestored
https://github.com/apache/beam/issues/21472 Dataflow streaming tests failing 
new AfterSynchronizedProcessingTime test
https://github.com/apache/beam/issues/21471 Flakes: Failed to load cache entry
https://github.com/apache/beam/issues/21470 Test flake: test_split_half_sdf
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21468 
beam_PostCommit_Python_Examples_Dataflow failing
https://github.com/apache/beam/issues/21467 GBK and CoGBK streaming Java load 
tests failing
https://github.com/apache/beam/issues/21465 Kafka commit offset drop data on 
failure for runners that have non-checkpointing shuffle
https://github.com/apache/beam/issues/21463 NPE in Flink Portable 
ValidatesRunner streaming suite
https://github.com/apache/beam/issues/21462 Flake in 
org.apache.beam.sdk.io.mqtt.MqttIOTest.testReadObject: Address already in use
https://github.com/apache/beam/issues/21271 pubsublite.ReadWriteIT flaky in 
beam_PostCommit_Java_DataflowV2  
https://github.com/apache/beam/issues/21270 
org.apache.beam.sdk.transforms.CombineTest$WindowingTests.testWindowedCombineGloballyAsSingletonView
 flaky on Dataflow Runner V2
https://github.com/apache/beam/issues/21268 Race between member variable being 
accessed due to leaking uninitialized state via OutboundObserverFactory
https://github.com/apache/beam/issues/21267 WriteToBigQuery submits a duplicate 
BQ load job if a 503 error code is returned from googleapi
https://github.com/apache/beam/issues/21266 
org.apache.beam.sdk.transforms.ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
 is flaky in Java ValidatesRunner Flink suite.
https://github.com/apache/beam/issues/21265 
apache_beam.runners.portability.fn_api_runner.translations_test.TranslationsTest.test_run_packable_combine_globally
 'apache_beam.coders.coder_impl._AbstractIterable' object is not reversible
https://github.com/apache/beam/issues/21263 (Broken Pipe induced) Bricked 
Dataflow Pipeline 
https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not 
follow spec
https://github.com/apache/beam/issues/21261 
org.apache.beam.runners.dataflow.worker.fn.logging.BeamFnLoggingServiceTest.testMultipleClientsFailingIsHandledGracefullyByServer
 is flaky
https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit 
data at GC time
https://github.com/apache/beam/issues/21257 Either Create or DirectRunner fails 
to produce all elements to the following transform
https://github.com/apache/beam/issues/21123