Infrastructure-as-Code to provision a private GKE autopilot kubernetes cluster and strimzi kafka

2023-03-01 Thread Damon Douglas via dev
Hello Everyone,

I created a PR to provide to the Beam community terraform code to provision
a private Google Kubernetes Engine and kubernetes manifests to provision an
internally TCP load balanced strimzi.io Kafka cluster.  This solution
helped me a lot when I needed a repeatable solution to spin up resources
for reading from and writing to Kafka without having to scratch my head and
remember steps.

https://github.com/apache/beam/pull/25686

This is *not* meant to replace our current test-infra kubernetes and kafka
setup which is designed for our automated testing using jenkins.

Best,

Damon


Re: Consuming one PCollection before consuming another with Beam

2023-03-01 Thread Reuven Lax via dev
I'm not sure I understand this use case well. What are you planning on
doing with the BQ dataset if it were processed first? Were you planning on
caching information in memory? Storing data in Beam state? Something else?

On Wed, Mar 1, 2023 at 10:43 AM Kenneth Knowles  wrote:

>
>
> On Tue, Feb 28, 2023 at 5:14 PM Sahil Modak 
> wrote:
>
>> The number of keys/data in BQ would not be constant and grow with time.
>>
>> A rough estimate would be around 300k keys with an average size of 5kb
>> per key. Both the count of the keys and the size of the key would be
>> feature dependent (based on the upstream pipelines) and we won't have
>> control over this in the future.
>>
>> Using big query client would mean we would have to run individual queries
>> for each of these 300k keys from the BusinessLogic() dofn which operates in
>> a global window KV
>>
>> Also, the order of the data from BQ would not matter to us since the only
>> thing we are trying to solve here is regaining the state spec information
>> before starting to consume pub/sub.
>>
>
> I was referring to order in general, across your whole data set as an
> abstract concept. If order _really_ doesn't matter, then you wouldn't need
> to read the BQ data first. You could just flatten them together and run the
> pipeline like that. So I think there is some order-dependence that you want
> to represent at the data level.
>
> Kenn
>
>
>> I will explore using Wait.on(bigquery) before pub/sub read since I am not
>> sure if side input would be the best option here.
>>
>>
>> On Tue, Feb 28, 2023 at 8:44 AM Kenneth Knowles  wrote:
>>
>>> I'm also curious how much you depend on order to get the state contents
>>> right. The ordering of the side input will be arbitrary, and even the
>>> streaming input can have plenty of out of order messages. So I want to
>>> think about what are the data dependencies that result in the requirement
>>> of order. Or if there are none and you just want to know that all the past
>>> data has been processed, Niel's idea is one solution. It isn't parallel,
>>> though.
>>>
>>> Kenn
>>>
>>> On Mon, Feb 27, 2023 at 11:59 AM Reuven Lax  wrote:
>>>
 How large is this state spec stored in BQ? If the size isn't too large,
 you can read it from BQ and make it a side input into the DoFn.

 On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak <
 smo...@paloaltonetworks.com> wrote:

> We are trying to re-initialize our state specs in the BusinessLogic()
> DoFn from BQ.
> BQ has data about the state spec, and we would like to make sure that
> the state specs in our BusinessLogic() dofn are initialized before it
> starts consuming the pub/sub.
>
> This is for handling the case of redeployment of the dataflow jobs so
> that the states are preserved and the BusinessLogic() can work seamlessly
> as it was previously. All our dofns are operating in a global window and 
> do
> not perform any aggregation.
>
> We are currently using Redis to preserve the state spec information
> but would like to explore using BQ as an alternative to Redis.
>
> On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles 
> wrote:
>
>> My suggestion is to try to solve the problem in terms of what you
>> want to compute. Instead of trying to control the operational aspects 
>> like
>> "read all the BQ before reading Pubsub" there is presumably some reason
>> that the BQ data naturally "comes first", for example if its timestamps 
>> are
>> earlier or if there is a join or an aggregation that must include it.
>> Whenever you think you want to set up an operational dependency between 
>> two
>> things that "happen" in a pipeline, it is often best to pivot your 
>> thinking
>> to the data and what you are trying to compute, and the built-in
>> dependencies will solve the ordering problems.
>>
>> So - is there a way to describe your problem in terms of the data and
>> what you are trying to compute?
>>
>> Kenn
>>
>> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev <
>> dev@beam.apache.org> wrote:
>>
>>> First PCollections are completely unordered, so there is no
>>> guarantee on what order you'll see events in the flattened PCollection.
>>>
>>> There may be ways to process the BigQuery data in a
>>> separate transform first, but it depends on the structure of the data. 
>>> How
>>> large is the BigQuery table? Are you doing any windowed aggregations 
>>> here?
>>>
>>> Reuven
>>>
>>> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <
>>> smo...@paloaltonetworks.com> wrote:
>>>
 Yes, this is a streaming pipeline.

 Some more details about existing implementation v/s what we want to
 achieve.

 Current implementation:
 Reading from pub-sub:

 Pipeline input = Pipeline.create(options);

Re: Consuming one PCollection before consuming another with Beam

2023-03-01 Thread Kenneth Knowles
On Tue, Feb 28, 2023 at 5:14 PM Sahil Modak 
wrote:

> The number of keys/data in BQ would not be constant and grow with time.
>
> A rough estimate would be around 300k keys with an average size of 5kb per
> key. Both the count of the keys and the size of the key would be feature
> dependent (based on the upstream pipelines) and we won't have control over
> this in the future.
>
> Using big query client would mean we would have to run individual queries
> for each of these 300k keys from the BusinessLogic() dofn which operates in
> a global window KV
>
> Also, the order of the data from BQ would not matter to us since the only
> thing we are trying to solve here is regaining the state spec information
> before starting to consume pub/sub.
>

I was referring to order in general, across your whole data set as an
abstract concept. If order _really_ doesn't matter, then you wouldn't need
to read the BQ data first. You could just flatten them together and run the
pipeline like that. So I think there is some order-dependence that you want
to represent at the data level.

Kenn


> I will explore using Wait.on(bigquery) before pub/sub read since I am not
> sure if side input would be the best option here.
>
>
> On Tue, Feb 28, 2023 at 8:44 AM Kenneth Knowles  wrote:
>
>> I'm also curious how much you depend on order to get the state contents
>> right. The ordering of the side input will be arbitrary, and even the
>> streaming input can have plenty of out of order messages. So I want to
>> think about what are the data dependencies that result in the requirement
>> of order. Or if there are none and you just want to know that all the past
>> data has been processed, Niel's idea is one solution. It isn't parallel,
>> though.
>>
>> Kenn
>>
>> On Mon, Feb 27, 2023 at 11:59 AM Reuven Lax  wrote:
>>
>>> How large is this state spec stored in BQ? If the size isn't too large,
>>> you can read it from BQ and make it a side input into the DoFn.
>>>
>>> On Mon, Feb 27, 2023 at 11:06 AM Sahil Modak <
>>> smo...@paloaltonetworks.com> wrote:
>>>
 We are trying to re-initialize our state specs in the BusinessLogic()
 DoFn from BQ.
 BQ has data about the state spec, and we would like to make sure that
 the state specs in our BusinessLogic() dofn are initialized before it
 starts consuming the pub/sub.

 This is for handling the case of redeployment of the dataflow jobs so
 that the states are preserved and the BusinessLogic() can work seamlessly
 as it was previously. All our dofns are operating in a global window and do
 not perform any aggregation.

 We are currently using Redis to preserve the state spec information but
 would like to explore using BQ as an alternative to Redis.

 On Fri, Feb 24, 2023 at 12:51 PM Kenneth Knowles 
 wrote:

> My suggestion is to try to solve the problem in terms of what you want
> to compute. Instead of trying to control the operational aspects like 
> "read
> all the BQ before reading Pubsub" there is presumably some reason that the
> BQ data naturally "comes first", for example if its timestamps are earlier
> or if there is a join or an aggregation that must include it. Whenever you
> think you want to set up an operational dependency between two things that
> "happen" in a pipeline, it is often best to pivot your thinking to the 
> data
> and what you are trying to compute, and the built-in dependencies will
> solve the ordering problems.
>
> So - is there a way to describe your problem in terms of the data and
> what you are trying to compute?
>
> Kenn
>
> On Fri, Feb 24, 2023 at 10:46 AM Reuven Lax via dev <
> dev@beam.apache.org> wrote:
>
>> First PCollections are completely unordered, so there is no guarantee
>> on what order you'll see events in the flattened PCollection.
>>
>> There may be ways to process the BigQuery data in a
>> separate transform first, but it depends on the structure of the data. 
>> How
>> large is the BigQuery table? Are you doing any windowed aggregations 
>> here?
>>
>> Reuven
>>
>> On Fri, Feb 24, 2023 at 10:40 AM Sahil Modak <
>> smo...@paloaltonetworks.com> wrote:
>>
>>> Yes, this is a streaming pipeline.
>>>
>>> Some more details about existing implementation v/s what we want to
>>> achieve.
>>>
>>> Current implementation:
>>> Reading from pub-sub:
>>>
>>> Pipeline input = Pipeline.create(options);
>>>
>>> PCollection pubsubStream = input.apply("Read From Pubsub", 
>>> PubsubIO.readMessagesWithAttributesAndMessageId()
>>>
>>> .fromSubscription(inputSubscriptionId))
>>>
>>>
>>> Reading from bigquery:
>>>
>>> PCollection bqStream = input.apply("Read from BQ", BigQueryIO
>>> 

Beam High Priority Issue Report (37)

2023-03-01 Thread beamactions
This is your daily summary of Beam's current high priority issues that may need 
attention.

See https://beam.apache.org/contribute/issue-priorities for the meaning and 
expectations around issue priorities.

Unassigned P1 Issues:

https://github.com/apache/beam/issues/25669 [Bug]: Different orderings of 
SchemaAwareExternalTransform() kwargs may result in misplaced arguments 
https://github.com/apache/beam/issues/24776 [Bug]: Race condition in Python SDK 
Harness ProcessBundleProgress
https://github.com/apache/beam/issues/24389 [Failing Test]: 
HadoopFormatIOElasticTest.classMethod ExceptionInInitializerError 
ContainerFetchException
https://github.com/apache/beam/issues/24367 [Bug]: workflow.tar.gz cannot be 
passed to flink runner
https://github.com/apache/beam/issues/24313 [Flaky]: 
apache_beam/runners/portability/portable_runner_test.py::PortableRunnerTestWithSubprocesses::test_pardo_state_with_custom_key_coder
https://github.com/apache/beam/issues/24267 [Failing Test]: Timeout waiting to 
lock gradle
https://github.com/apache/beam/issues/23944  beam_PreCommit_Python_Cron 
regularily failing - test_pardo_large_input flaky
https://github.com/apache/beam/issues/23709 [Flake]: Spark batch flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElement and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundle
https://github.com/apache/beam/issues/22969 Discrepancy in behavior of 
`DoFn.process()` when `yield` is combined with `return` statement, or vice versa
https://github.com/apache/beam/issues/22961 [Bug]: WriteToBigQuery silently 
skips most of records without job fail
https://github.com/apache/beam/issues/22913 [Bug]: 
beam_PostCommit_Java_ValidatesRunner_Flink is flakes in 
org.apache.beam.sdk.transforms.GroupByKeyTest$BasicTests.testAfterProcessingTimeContinuationTriggerUsingState
https://github.com/apache/beam/issues/22115 [Bug]: 
apache_beam.runners.portability.portable_runner_test.PortableRunnerTestWithSubprocesses
 is flaky
https://github.com/apache/beam/issues/21713 404s in BigQueryIO don't get output 
to Failed Inserts PCollection
https://github.com/apache/beam/issues/21706 Flaky timeout in github Python unit 
test action 
StatefulDoFnOnDirectRunnerTest.test_dynamic_timer_clear_then_set_timer
https://github.com/apache/beam/issues/21695 DataflowPipelineResult does not 
raise exception for unsuccessful states.
https://github.com/apache/beam/issues/21643 FnRunnerTest with non-trivial 
(order 1000 elements) numpy input flakes in non-cython environment
https://github.com/apache/beam/issues/21469 beam_PostCommit_XVR_Flink flaky: 
Connection refused
https://github.com/apache/beam/issues/21424 Java VR (Dataflow, V2, Streaming) 
failing: ParDoTest$TimestampTests/OnWindowExpirationTests
https://github.com/apache/beam/issues/21262 Python AfterAny, AfterAll do not 
follow spec
https://github.com/apache/beam/issues/21260 Python DirectRunner does not emit 
data at GC time
https://github.com/apache/beam/issues/21121 
apache_beam.examples.streaming_wordcount_it_test.StreamingWordCountIT.test_streaming_wordcount_it
 flakey
https://github.com/apache/beam/issues/21104 Flaky: 
apache_beam.runners.portability.fn_api_runner.fn_runner_test.FnApiRunnerTestWithGrpcAndMultiWorkers
https://github.com/apache/beam/issues/20976 
apache_beam.runners.portability.flink_runner_test.FlinkRunnerTestOptimized.test_flink_metrics
 is flaky
https://github.com/apache/beam/issues/20974 Python GHA PreCommits flake with 
grpc.FutureTimeoutError on SDK harness startup
https://github.com/apache/beam/issues/20689 Kafka commitOffsetsInFinalize OOM 
on Flink
https://github.com/apache/beam/issues/20108 Python direct runner doesn't emit 
empty pane when it should
https://github.com/apache/beam/issues/19814 Flink streaming flakes in 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInStartBundleStateful and 
ParDoLifecycleTest.testTeardownCalledAfterExceptionInProcessElementStateful
https://github.com/apache/beam/issues/19465 Explore possibilities to lower 
in-use IP address quota footprint.
https://github.com/apache/beam/issues/19241 Python Dataflow integration tests 
should export the pipeline Job ID and console output to Jenkins Test Result 
section


P1 Issues with no update in the last week:

https://github.com/apache/beam/issues/25412 [Feature Request]: Google Cloud 
Bigtable Change Stream Connector
https://github.com/apache/beam/issues/23875 [Bug]: beam.Row.__eq__ returns true 
for unequal rows
https://github.com/apache/beam/issues/23525 [Bug]: Default PubsubMessage coder 
will drop message id and orderingKey
https://github.com/apache/beam/issues/22605 [Bug]: Beam Python failure for 
dataflow_exercise_metrics_pipeline_test.ExerciseMetricsPipelineTest.test_metrics_it
https://github.com/apache/beam/issues/21714 
PulsarIOTest.testReadFromSimpleTopic is very flaky
https://github.com/apache/beam/issues/21708 beam_PostCommit_Java_DataflowV2, 
testBigQueryStorageWrite30MProto failing consistently