[Question] Apache Beam for real time timeseries

2021-10-28 Thread Chiara Troiani
Hello,

I am pretty new to Beam and I have a basic question.

Supposing I am ingesting streaming data at a frequency of 1Hz (event time).
I need to process them in real time by key, and output results at 1Hz
frequency.
Time order is extremely important.
I need to compute a result which depends on the current PCollectionElement
and on the previous one (in event time), per each key.

I saw I can use state and timers, but I have the impression that I cannot
compute results at the same frequency that input data is produced.

Is there a way to ensure time order in this case?
Did I misunderstand something?

Many thanks in advance,
Chiara


Re: [Question]

2021-10-28 Thread Alexey Romanenko
Hi,

You just need to create a shaded jar for SparkRunner and submit it with 
"spark-submit” and CLI options “--deploy-mode cluster --master yarn”.
Also, you need to specify “--runner=SparkRunner --sparkMaster=yarn” as pipeline 
options. 

—
Alexey

> On 26 Oct 2021, at 19:07, Holt Spalding  wrote:
> 
> Hello,
> 
> I've been having an incredibly difficult time running an apache beam pipeline 
> on a spark cluster in aws EMR. Spark on EMR is configured for yarn, which 
> appears to be the primary source of my issues, the documentation here: 
> https://beam.apache.org/documentation/runners/spark/ 
> 
> only seems to describe how to run beam on a cluster in Standalone mode. I've 
> tried different beam pipeline arguments, but it seems to run in local mode 
> every time after it can't find a spark master url. Has anyone run into this 
> issue, and or does anyone have suggestions or examples of how to get this to 
> work. Thank you for your help. 
> 



Re: [Question] Beam+Python+Flink

2021-10-28 Thread Chiara Troiani
Hi Jan,

Thank you very much for your answer, and sorry for the late reply.

I can see the job in flink UI, but it still fails, with this
exception among others:

Caused by:
org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.AbstractChannel$AnnotatedConnectException:


finishConnect(..) failed: Connection refused: localhost/127.0.0.1:5

Caused by: java.net.ConnectException: finishConnect(..) failed: Connection
refused


I guess it is a Docker problem now.
I still cannot figure out how to make sure that the worker_pool is
accessible via  'localhost' hostname.

Many thanks,
Chiara

On Mon, Oct 18, 2021 at 11:56 AM Jan Lukavský  wrote:

> Hi Chiara,
>
> environment_type LOOPBACK is meant for local execution only. The default
> is docker, which is not ideal when you use docker-compose (docker in
> docker), so the other option is to use EXTERNAL environment. With this
> environment, you must manually start the Python SDK harness as a separate
> container using apache/beam_python3.8_sdk docker image with args set to
> '--worker_pool'. That should run a container, that will take care of
> running the Python harness processes. It will by default listen on port
> 5, it must be accessible from the taskmanager container via
> localhost:5, and you then pass it via environment_config, e.g.:
>
>  --environment_type=EXTERNAL --environment_config=localhost:5
>
> That should do the trick. Because of limitations of the 'worker_pool' you
> must make sure, that it is accessible via 'localhost' hostname. For more
> information see [1].
>
>  Jan
>
> [1] https://beam.apache.org/documentation/runtime/sdk-harness-config/
> On 10/18/21 11:43, Chiara Troiani wrote:
>
> Hi,
>
>
> I am trying to follow these tutorials
>
> http://beam.apache.org/documentation/runners/flink/
>
> For the Portable (Python)
>
>
> I am not able to execute a Beam pipeline on a Flink cluster.
>
> I am running a Flink Session Cluster with docker-compose,
>
>
> This is my docker-compose file:
>
>
> ——
>
> version: "2.2"
>
> services:
>
>   jobmanager:
>
> image: flink:1.13.2-scala_2.11
>
> ports:
>
>   - "8081:8081"
>
> command: jobmanager
>
> environment:
>
>   - |
>
> FLINK_PROPERTIES=
>
> jobmanager.rpc.address: jobmanager
>
>
>   taskmanager:
>
> image: flink:1.13.2-scala_2.11
>
> depends_on:
>
>   - jobmanager
>
> command: taskmanager
>
> scale: 1
>
> environment:
>
>   - |
>
> FLINK_PROPERTIES=
>
> jobmanager.rpc.address: jobmanager
>
> taskmanager.numberOfTaskSlots: 2
>
>
> —
>
>
> I run the examples from a virtual environment, python3.8,
> apache-beam==2.32.0
>
> macOS Catalina 10.15.7
>
> Docker desktop 4.1.1
>
>
> When I run:
>
> python -m apache_beam.examples.wordcount --input=text.txt --output=out.txt
> --runner=FlinkRunner --flink_master=localhost:8081
> --environment_type=LOOPBACK
>
>
> I get this error:
>
> *org.apache.flink.runtime.JobException: Recovery is suppressed by
> NoRestartBackoffTimeStrategy*
>
> * at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)*
>
> * at
> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)*
>
> * at
> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)*
>
> * at
> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)*
>
> * at
> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)*
>
> * at
> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(SchedulerBase.java:682)*
>
> * at
> org.apache.flink.runtime.scheduler.SchedulerNG.updateTaskExecutionState(SchedulerNG.java:79)*
>
> * at
> org.apache.flink.runtime.jobmaster.JobMaster.updateTaskExecutionState(JobMaster.java:435)*
>
> * at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)*
>
> * at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)*
>
> * at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)*
>
> * at java.lang.reflect.Method.invoke(Method.java:498)*
>
> * at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcInvocation(AkkaRpcActor.java:305)*
>
> * at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleRpcMessage(AkkaRpcActor.java:212)*
>
> * at
> org.apache.flink.runtime.rpc.akka.FencedAkkaRpcActor.handleRpcMessage(FencedAkkaRpcActor.java:77)*
>
> * at
> org.apache.flink.runtime.rpc.akka.AkkaRpcActor.handleMessage(AkkaRpcActor.java:158)*
>
> * at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:26)*
>
> * at akka.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:21)*
>
> * at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:123)*
>
> * at akka.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.s

Potential bug: AutoValue + Memoized fields

2021-10-28 Thread Cristian Constantinescu
Hi everyone,

Looks like Beam has a little bit of an issue when using AutoValues with
Memoized (cached) fields. It's not a big issue, and the workaround is
simply not using Memoised fields at the cost of a little performance. (See
comment in code snippet)

The code further below produces this exception:
Exception in thread "main" java.lang.NullPointerException
at
org.apache.beam.sdk.schemas.utils.JavaBeanUtils.createGetter(JavaBeanUtils.java:155)
at
org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getGetters$1(JavaBeanUtils.java:145)
at
java.base/java.util.stream.ReferencePipeline$3$1.accept(ReferencePipeline.java:195)
at
java.base/java.util.ArrayList$ArrayListSpliterator.forEachRemaining(ArrayList.java:1655)
at
java.base/java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:484)
at
java.base/java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:474)
at
java.base/java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:913)
at
java.base/java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234)
at
java.base/java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:578)
at
org.apache.beam.sdk.schemas.utils.JavaBeanUtils.lambda$getGetters$2(JavaBeanUtils.java:146)
at
java.base/java.util.concurrent.ConcurrentHashMap.computeIfAbsent(ConcurrentHashMap.java:1705)
at
org.apache.beam.sdk.schemas.utils.JavaBeanUtils.getGetters(JavaBeanUtils.java:140)
at
org.apache.beam.sdk.schemas.AutoValueSchema.fieldValueGetters(AutoValueSchema.java:72)
at org.apache.beam.sdk.schemas.CachingFactory.create(CachingFactory.java:56)
at org.apache.beam.sdk.values.RowWithGetters.(RowWithGetters.java:66)
at
org.apache.beam.sdk.values.Row$Builder.withFieldValueGetters(Row.java:835)
at
org.apache.beam.sdk.schemas.GetterBasedSchemaProvider$ToRowWithValueGetters.apply(GetterBasedSchemaProvider.java:64)
at
org.apache.beam.sdk.schemas.GetterBasedSchemaProvider$ToRowWithValueGetters.apply(GetterBasedSchemaProvider.java:49)
at org.apache.beam.sdk.schemas.SchemaCoder.encode(SchemaCoder.java:124)
at org.apache.beam.sdk.coders.Coder.encode(Coder.java:136)
at
org.apache.beam.sdk.util.CoderUtils.encodeToSafeStream(CoderUtils.java:85)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:69)
at org.apache.beam.sdk.util.CoderUtils.encodeToByteArray(CoderUtils.java:54)
at
org.apache.beam.sdk.transforms.Create$Values$CreateSource.fromIterable(Create.java:413)
at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:370)
at org.apache.beam.sdk.transforms.Create$Values.expand(Create.java:277)
at org.apache.beam.sdk.Pipeline.applyInternal(Pipeline.java:548)
at org.apache.beam.sdk.Pipeline.applyTransform(Pipeline.java:482)
at org.apache.beam.sdk.values.PBegin.apply(PBegin.java:44)
at org.apache.beam.sdk.Pipeline.apply(Pipeline.java:177)
at org.whatever.testing.App.main(App.java:24)


package org.whatever.testing;

import com.google.auto.value.AutoValue;
import com.google.auto.value.extension.memoized.Memoized;
import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.schemas.AutoValueSchema;
import org.apache.beam.sdk.schemas.annotations.DefaultSchema;
import org.apache.beam.sdk.schemas.annotations.SchemaCreate;
import org.apache.beam.sdk.schemas.transforms.Convert;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.transforms.MapElements;
import org.apache.beam.sdk.values.TypeDescriptor;

import java.util.Arrays;

public class App {

public static void main(String[] args) {
var options =
PipelineOptionsFactory.fromArgs(args).withValidation().create();

var p = Pipeline.create(options);
p

.apply(Create.of(Arrays.asList(FooAutoValue.builder().setDummyProp("dummy").build(
.apply(Convert.to(FooAutoValue.class))

.apply(MapElements.into(TypeDescriptor.of(FooAutoValue.class)).via(i
-> {
System.out.println(i.toString());
return i;
}))
;
p.run().waitUntilFinish();
}

@AutoValue
@DefaultSchema(AutoValueSchema.class)
public static abstract class FooAutoValue {
public abstract String getDummyProp();

@Memoized // <-- commenting this line makes everything work
public String getSomething(){
return "sldj";
}

@SchemaCreate
public static FooAutoValue create(String dummyProp) {
return builder()
.setDummyProp(dummyProp)
.build();
}

public static Builder builder() {
return new AutoValue_App_FooAutoValue.Builder();
}

@AutoValue.Builder
public abstract static class Builder {
public abstract Builder setDummyProp(String newDummyProp);

public abstract FooAutoValue build();
}
}
}


>From what I can see, instead of getting the fields from the abstract class,
it's tryi

Re: [Question] Beam+Python+Flink

2021-10-28 Thread Sam Bourne
Hey Chiara,

I went through a lot of the same struggles a while back and made this repo
to showcase how I accomplished something similar.
https://github.com/sambvfx/beam-flink-k8s

It shouldn't be hard to convert to a docker-compose setup (I actually had
it like this originally while testing before porting to kubernetes).


On Thu, Oct 28, 2021 at 10:41 AM Kyle Weaver  wrote:

> > I still cannot figure out how to make sure that the worker_pool is
> accessible via  'localhost' hostname.
>
> In Kubernetes, we make the worker pool a sidecar of the Flink task manager
> container. Perhaps there is a similar feature available in docker compose?
>
>
> https://github.com/GoogleCloudPlatform/flink-on-k8s-operator/blob/0310df76d6e2128cd5d2bc51fae4e842d370c463/examples/beam/without_job_server/beam_flink_cluster.yaml#L17-L20
>
> On Thu, Oct 28, 2021 at 10:30 AM Chiara Troiani <
> t.chiara.for@gmail.com> wrote:
>
>> Hi Jan,
>>
>> Thank you very much for your answer, and sorry for the late reply.
>>
>> I can see the job in flink UI, but it still fails, with this
>> exception among others:
>>
>> Caused by:
>> org.apache.beam.vendor.grpc.v1p36p0.io.netty.channel.AbstractChannel$AnnotatedConnectException:
>>
>>
>> finishConnect(..) failed: Connection refused: localhost/127.0.0.1:5
>>
>> Caused by: java.net.ConnectException: finishConnect(..) failed:
>> Connection refused
>>
>>
>> I guess it is a Docker problem now.
>> I still cannot figure out how to make sure that the worker_pool is
>> accessible via  'localhost' hostname.
>>
>> Many thanks,
>> Chiara
>>
>> On Mon, Oct 18, 2021 at 11:56 AM Jan Lukavský  wrote:
>>
>>> Hi Chiara,
>>>
>>> environment_type LOOPBACK is meant for local execution only. The default
>>> is docker, which is not ideal when you use docker-compose (docker in
>>> docker), so the other option is to use EXTERNAL environment. With this
>>> environment, you must manually start the Python SDK harness as a separate
>>> container using apache/beam_python3.8_sdk docker image with args set to
>>> '--worker_pool'. That should run a container, that will take care of
>>> running the Python harness processes. It will by default listen on port
>>> 5, it must be accessible from the taskmanager container via
>>> localhost:5, and you then pass it via environment_config, e.g.:
>>>
>>>  --environment_type=EXTERNAL --environment_config=localhost:5
>>>
>>> That should do the trick. Because of limitations of the 'worker_pool'
>>> you must make sure, that it is accessible via 'localhost' hostname. For
>>> more information see [1].
>>>
>>>  Jan
>>>
>>> [1] https://beam.apache.org/documentation/runtime/sdk-harness-config/
>>> On 10/18/21 11:43, Chiara Troiani wrote:
>>>
>>> Hi,
>>>
>>>
>>> I am trying to follow these tutorials
>>>
>>> http://beam.apache.org/documentation/runners/flink/
>>>
>>> For the Portable (Python)
>>>
>>>
>>> I am not able to execute a Beam pipeline on a Flink cluster.
>>>
>>> I am running a Flink Session Cluster with docker-compose,
>>>
>>>
>>> This is my docker-compose file:
>>>
>>>
>>> ——
>>>
>>> version: "2.2"
>>>
>>> services:
>>>
>>>   jobmanager:
>>>
>>> image: flink:1.13.2-scala_2.11
>>>
>>> ports:
>>>
>>>   - "8081:8081"
>>>
>>> command: jobmanager
>>>
>>> environment:
>>>
>>>   - |
>>>
>>> FLINK_PROPERTIES=
>>>
>>> jobmanager.rpc.address: jobmanager
>>>
>>>
>>>   taskmanager:
>>>
>>> image: flink:1.13.2-scala_2.11
>>>
>>> depends_on:
>>>
>>>   - jobmanager
>>>
>>> command: taskmanager
>>>
>>> scale: 1
>>>
>>> environment:
>>>
>>>   - |
>>>
>>> FLINK_PROPERTIES=
>>>
>>> jobmanager.rpc.address: jobmanager
>>>
>>> taskmanager.numberOfTaskSlots: 2
>>>
>>>
>>> —
>>>
>>>
>>> I run the examples from a virtual environment, python3.8,
>>> apache-beam==2.32.0
>>>
>>> macOS Catalina 10.15.7
>>>
>>> Docker desktop 4.1.1
>>>
>>>
>>> When I run:
>>>
>>> python -m apache_beam.examples.wordcount --input=text.txt
>>> --output=out.txt --runner=FlinkRunner --flink_master=localhost:8081
>>> --environment_type=LOOPBACK
>>>
>>>
>>> I get this error:
>>>
>>> *org.apache.flink.runtime.JobException: Recovery is suppressed by
>>> NoRestartBackoffTimeStrategy*
>>>
>>> * at
>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.handleFailure(ExecutionFailureHandler.java:138)*
>>>
>>> * at
>>> org.apache.flink.runtime.executiongraph.failover.flip1.ExecutionFailureHandler.getFailureHandlingResult(ExecutionFailureHandler.java:82)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.handleTaskFailure(DefaultScheduler.java:216)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.maybeHandleTaskFailure(DefaultScheduler.java:206)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.DefaultScheduler.updateTaskExecutionStateInternal(DefaultScheduler.java:197)*
>>>
>>> * at
>>> org.apache.flink.runtime.scheduler.SchedulerBase.updateTaskExecutionState(Sc

Re: JsonToRow parsing Proto[String to Int64]

2021-10-28 Thread gaurav mishra
the validator function will probably have to do try catch with
Long.parselong(jsonNode.toString()) to test for valid inputs in case the
current check - jsonNode.isIntegralNumber() && jsonNode.canConvertToLong()
is false.

On Fri, Oct 22, 2021 at 8:07 PM Reuven Lax  wrote:

> If we want to support this behavior, we would have to change this code:
>
>
> https://github.com/apache/beam/blob/master/sdks/java/core/src/main/java/org/apache/beam/sdk/util/RowJsonValueExtractors.java#L93
>
> I believe that instead of longValue, JsonNode::asLong would have to be
> used (that function parses strings). I'm not sure how to test for invalid
> input, since that function simply returns 0 in that case, which is also a
> valid value.
>
> Reuven
>
> On Fri, Oct 22, 2021 at 12:41 PM gaurav mishra <
> gauravmishra.it...@gmail.com> wrote:
>
>> I have two pipelines
>> PipelineA -> output to pubsub(schema bound json) -> PipelineB
>>
>> PipelineA is emitting proto models serialized as JsonStrings.
>> Serialization is done using JsonFormat.printer.
>> ```
>> JsonFormat.printer().preservingProtoFieldNames()
>> .print(model)
>> ```
>>
>> In PipelineB I am trying to read these as jsons as Row which is tied to
>> the schema derived to the same proto which was used in PipelineA.
>>
>> ```
>> Schema schema = new
>> ProtoMessageSchema().schemaFor(TypeDescriptor.of(protoClass));
>> 
>> input.apply(JsonToRow.withSchema(schema));
>> ...
>> ```
>>
>> The problem I am facing is with int64 type fields. When these fields are
>> serialized using `JsonFormat.printer` these are serialized as strings in
>> final json, which is by design(
>> https://developers.google.com/protocol-buffers/docs/proto3#json).
>> In pipelineB when the framework tries to deserialize these fields as
>> int64 it fails.
>> ```
>> Unable to get value from field 'site_id'. Schema type 'INT64'. JSON node
>> type STRING
>> org.apache.beam.sdk.util.RowJson$RowJsonDeserializer.extractJsonPrimitiveValue
>> ```
>>
>> Is there a way to workaround this problem, can I do something either on
>> serialization side or deserialization side to fix this?
>>
>>
>>