[Question] Apache Beam for real time timeseries

2021-10-28 Thread Chiara Troiani

I am pretty new to Beam and I have a basic question.

Supposing I am ingesting streaming data at a frequency of 1Hz (event time).
I need to process them in real time by key, and output results at 1Hz
Time order is extremely important.
I need to compute a result which depends on the current PCollectionElement
and on the previous one (in event time), per each key.

I saw I can use state and timers, but I have the impression that I cannot
compute results at the same frequency that input data is produced.

Is there a way to ensure time order in this case?
Did I misunderstand something?

Many thanks in advance,

Re: [Question]

2021-10-28 Thread Alexey Romanenko

You just need to create a shaded jar for SparkRunner and submit it with 
"spark-submit” and CLI options “--deploy-mode cluster --master yarn”.
Also, you need to specify “--runner=SparkRunner --sparkMaster=yarn” as pipeline 


> On 26 Oct 2021, at 19:07, Holt Spalding  wrote:
> Hello,
> I've been having an incredibly difficult time running an apache beam pipeline 
> on a spark cluster in aws EMR. Spark on EMR is configured for yarn, which 
> appears to be the primary source of my issues, the documentation here: 
> https://beam.apache.org/documentation/runners/spark/ 
> only seems to describe how to run beam on a cluster in Standalone mode. I've 
> tried different beam pipeline arguments, but it seems to run in local mode 
> every time after it can't find a spark master url. Has anyone run into this 
> issue, and or does anyone have suggestions or examples of how to get this to 
> work. Thank you for your help. 

Re: [Question] Beam+Python+Flink

2021-10-28 Thread Chiara Troiani
Hi Jan,

Thank you very much for your answer, and sorry for the late reply.

I can see the job in flink UI, but it still fails, with this
exception among others:

Caused by:

finishConnect(..) failed: Connection refused: localhost/

Caused by: java.net.ConnectException: finishConnect(..) failed: Connection

I guess it is a Docker problem now.
I still cannot figure out how to make sure that the worker_pool is
accessible via  'localhost' hostname.

Many thanks,

On Mon, Oct 18, 2021 at 11:56 AM Jan Lukavský  wrote:

> Hi Chiara,
> environment_type LOOPBACK is meant for local execution only. The default
> is docker, which is not ideal when you use docker-compose (docker in
> docker), so the other option is to use EXTERNAL environment. With this
> environment, you must manually start the Python SDK harness as a separate
> container using apache/beam_python3.8_sdk docker image with args set to
> '--worker_pool'. That should run a container, that will take care of
> running the Python harness processes. It will by default listen on port
> 5, it must be accessible from the taskmanager container via
> localhost:5, and you then pass it via environment_config, e.g.:
>  --environment_type=EXTERNAL --environment_config=localhost:5
> That should do the trick. Because of limitations of the 'worker_pool' you
> must make sure, that it is accessible via 'localhost' hostname. For more
> information see [1].
>  Jan
> [1] https://beam.apache.org/documentation/runtime/sdk-harness-config/
> On 10/18/21 11:43, Chiara Troiani wrote:
> Hi,
> I am trying to follow these tutorials
> http://beam.apache.org/documentation/runners/flink/
> For the Portable (Python)
> I am not able to execute a Beam pipeline on a Flink cluster.
> I am running a Flink Session Cluster with docker-compose,
> This is my docker-compose file:
> ——
> version: "2.2"
> services:
>   jobmanager:
> image: flink:1.13.2-scala_2.11
> ports:
>   - "8081:8081"
> command: jobmanager
> environment:
>   - |
> jobmanager.rpc.address: jobmanager
>   taskmanager:
> image: flink:1.13.2-scala_2.11
> depends_on:
>   - jobmanager
> command: taskmanager
> scale: 1
> environment:
>   - |
> jobmanager.rpc.address: jobmanager
> taskmanager.numberOfTaskSlots: 2
> —
> I run the examples from a virtual environment, python3.8,
> apache-beam==2.32.0
> macOS Catalina 10.15.7
> Docker desktop 4.1.1
> When I run:
> python -m apache_beam.examples.wordcount --input=text.txt --output=out.txt
> --runner=FlinkRunner --flink_master=localhost:8081
> --environment_type=LOOPBACK
> I get this error:
> *org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy*
> * at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)*
> * at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)*
> * at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)*
> * at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)*
> * at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)*
> * at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)*
> * at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)*
> * at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)*
> * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
> * at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
> * at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
> * at java.lang.reflect.Method.invoke(Method.java:498)*
> * at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)*
> * at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)*
> * at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)*
> * at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)*
> * at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)*
> * at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)*
> * at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)*
> * at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.s

Potential bug: AutoValue + Memoized fields

2021-10-28 Thread Cristian Constantinescu
Hi everyone,

Looks like Beam has a little bit of an issue when using AutoValues with
Memoized (cached) fields. It's not a big issue, and the workaround is
simply not using Memoised fields at the cost of a little performance. (See
comment in code snippet)

The code further below produces this exception:
Exception in thread "main" java.lang.NullPointerException
at org.apache.beam.sdk.schemas.CachingFactory.create(CachingFactory.java:56)
at org.apache.beam.sdk.values.RowWithGetters.(RowWithGetters.java:66)
at org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:370)
at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:277)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:177)
at org.whatever.testing.App.main(App.java:24)

package org.whatever.testing;

import com.google.auto.value.AutoValue;
import com.google.auto.value.extension.memoized.Memoized;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

import java.util.Arrays;

public class App {

public static void main(String[] args) {
var options =

var p = Pipeline.create(options);


-> {
return i;

public static abstract class FooAutoValue {
public abstract String getDummyProp();

@Memoized // <-- commenting this line makes everything work
public String getSomething(){
return "sldj";

public static FooAutoValue create(String dummyProp) {
return builder()

public static Builder builder() {
return new AutoValue_App_FooAutoValue.Builder();

public abstract static class Builder {
public abstract Builder setDummyProp(String newDummyProp);

public abstract FooAutoValue build();

>From what I can see, instead of getting the fields from the abstract class,
it's tryi

Re: [Question] Beam+Python+Flink

2021-10-28 Thread Sam Bourne
Hey Chiara,

I went through a lot of the same struggles a while back and made this repo
to showcase how I accomplished something similar.

It shouldn't be hard to convert to a docker-compose setup (I actually had
it like this originally while testing before porting to kubernetes).

On Thu, Oct 28, 2021 at 10:41 AM Kyle Weaver  wrote:

> > I still cannot figure out how to make sure that the worker_pool is
> accessible via  'localhost' hostname.
> In Kubernetes, we make the worker pool a sidecar of the Flink task manager
> container. Perhaps there is a similar feature available in docker compose?
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/0310df76d6e2128cd5d2bc51fae4e842d370c463/examples/beam/without_job_server/beam_flink_cluster.yaml#L17-L20
> On Thu, Oct 28, 2021 at 10:30 AM Chiara Troiani <
> t.chiara.for@gmail.com> wrote:
>> Hi Jan,
>> Thank you very much for your answer, and sorry for the late reply.
>> I can see the job in flink UI, but it still fails, with this
>> exception among others:
>> Caused by:
>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>> finishConnect(..) failed: Connection refused: localhost/
>> Caused by: java.net.ConnectException: finishConnect(..) failed:
>> Connection refused
>> I guess it is a Docker problem now.
>> I still cannot figure out how to make sure that the worker_pool is
>> accessible via  'localhost' hostname.
>> Many thanks,
>> Chiara
>> On Mon, Oct 18, 2021 at 11:56 AM Jan Lukavský  wrote:
>>> Hi Chiara,
>>> environment_type LOOPBACK is meant for local execution only. The default
>>> is docker, which is not ideal when you use docker-compose (docker in
>>> docker), so the other option is to use EXTERNAL environment. With this
>>> environment, you must manually start the Python SDK harness as a separate
>>> container using apache/beam_python3.8_sdk docker image with args set to
>>> '--worker_pool'. That should run a container, that will take care of
>>> running the Python harness processes. It will by default listen on port
>>> 5, it must be accessible from the taskmanager container via
>>> localhost:5, and you then pass it via environment_config, e.g.:
>>>  --environment_type=EXTERNAL --environment_config=localhost:5
>>> That should do the trick. Because of limitations of the 'worker_pool'
>>> you must make sure, that it is accessible via 'localhost' hostname. For
>>> more information see [1].
>>>  Jan
>>> [1] https://beam.apache.org/documentation/runtime/sdk-harness-config/
>>> On 10/18/21 11:43, Chiara Troiani wrote:
>>> Hi,
>>> I am trying to follow these tutorials
>>> http://beam.apache.org/documentation/runners/flink/
>>> For the Portable (Python)
>>> I am not able to execute a Beam pipeline on a Flink cluster.
>>> I am running a Flink Session Cluster with docker-compose,
>>> This is my docker-compose file:
>>> ——
>>> version: "2.2"
>>> services:
>>>   jobmanager:
>>> image: flink:1.13.2-scala_2.11
>>> ports:
>>>   - "8081:8081"
>>> command: jobmanager
>>> environment:
>>>   - |
>>> jobmanager.rpc.address: jobmanager
>>>   taskmanager:
>>> image: flink:1.13.2-scala_2.11
>>> depends_on:
>>>   - jobmanager
>>> command: taskmanager
>>> scale: 1
>>> environment:
>>>   - |
>>> jobmanager.rpc.address: jobmanager
>>> taskmanager.numberOfTaskSlots: 2
>>> —
>>> I run the examples from a virtual environment, python3.8,
>>> apache-beam==2.32.0
>>> macOS Catalina 10.15.7
>>> Docker desktop 4.1.1
>>> When I run:
>>> python -m apache_beam.examples.wordcount --input=text.txt
>>> --output=out.txt --runner=FlinkRunner --flink_master=localhost:8081
>>> --environment_type=LOOPBACK
>>> I get this error:
>>> *org.apache.flink.runtime.JobException: Recovery is suppressed by
>>> NoRestartBackoffTimeStrategy*
>>> * at
>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)*
>>> * at
>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)*
>>> * at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)*
>>> * at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)*
>>> * at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)*
>>> * at
>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(Sc

Re: JsonToRow parsing Proto[String to Int64]

2021-10-28 Thread gaurav mishra
the validator function will probably have to do try catch with
Long.parselong(jsonNode.toString()) to test for valid inputs in case the
current check - jsonNode.isIntegralNumber() && jsonNode.canConvertToLong()
is false.

On Fri, Oct 22, 2021 at 8:07 PM Reuven Lax  wrote:

> If we want to support this behavior, we would have to change this code:
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValueExtractors.java#L93
> I believe that instead of longValue, JsonNode::asLong would have to be
> used (that function parses strings). I'm not sure how to test for invalid
> input, since that function simply returns 0 in that case, which is also a
> valid value.
> Reuven
> On Fri, Oct 22, 2021 at 12:41 PM gaurav mishra <
> gauravmishra.it...@gmail.com> wrote:
>> I have two pipelines
>> PipelineA -> output to pubsub(schema bound json) -> PipelineB
>> PipelineA is emitting proto models serialized as JsonStrings.
>> Serialization is done using JsonFormat.printer.
>> ```
>> JsonFormat.printer().preservingProtoFieldNames()
>> .print(model)
>> ```
>> In PipelineB I am trying to read these as jsons as Row which is tied to
>> the schema derived to the same proto which was used in PipelineA.
>> ```
>> Schema schema = new
>> ProtoMessageSchema().schemaFor(TypeDescriptor.of(protoClass));
>> input.apply(JsonToRow.withSchema(schema));
>> ...
>> ```
>> The problem I am facing is with int64 type fields. When these fields are
>> serialized using `JsonFormat.printer` these are serialized as strings in
>> final json, which is by design(
>> https://developers.google.com/protocol-buffers/docs/proto3#json).
>> In pipelineB when the framework tries to deserialize these fields as
>> int64 it fails.
>> ```
>> Unable to get value from field 'site_id'. Schema type 'INT64'. JSON node
>> type STRING
>> org.apache.beam.sdk.util.RowJson$RowJsonDeserializer.extractJsonPrimitiveValue
>> ```
>> Is there a way to workaround this problem, can I do something either on
>> serialization side or deserialization side to fix this?