Re: Where to specify trust.jks

2023-05-20 Thread Yu Watanabe
Hello.

Another solution might be putting it inside the custom container.
I use it for testing ElasticsearchIO with TLS session.

https://beam.apache.org/documentation/runtime/environments/
https://cloud.google.com/dataflow/docs/guides/using-custom-containers

Below is a dockerfile example
https://github.com/yuwtennis/apache-beam-pipeline-apps/blob/main/java/Dockerfile

Thanks,
Yu Watanabe

On Fri, May 19, 2023 at 12:28 AM Pablo Estrada via user
 wrote:
>
> Hi Utkarsh,
> you can pass a path in GCS (or a filesystem), and the workers should be able 
> to download it onto themselves. You'd pass 
> `gs://my-bucket-name/path/to/trust.jks`. Can you try that?
> Best
> -P.
>
> On Wed, May 10, 2023 at 1:58 PM Utkarsh Parekh  
> wrote:
>>
>> Hi,
>>
>> I'm testing a streaming app using kafka, Dafaflow, and Apache beam [Python].
>>
>>  "Error message from worker: org.apache.beam.sdk.util.UserCodeException: 
>> java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed 
>> to construct kafka consumer 
>> org.apache.beam.sdk.util.UserCodeException.wrap(UserCodeException.java:39) 
>> org.apache.beam.sdk.io.Read$UnboundedSourceAsSDFWrapperFn$DoFnInvoker.invokeSplitRestriction(Unknown
>>  Source) 
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForSplitRestriction(FnApiDoFnRunner.java:888)
>>  
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>  
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>  
>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
>>  
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForPairWithRestriction(FnApiDoFnRunner.java:825)
>>  
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>  
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>  
>> org.apache.beam.fn.harness.FnApiDoFnRunner.outputTo(FnApiDoFnRunner.java:1789)
>>  
>> org.apache.beam.fn.harness.FnApiDoFnRunner.access$3000(FnApiDoFnRunner.java:143)
>>  
>> org.apache.beam.fn.harness.FnApiDoFnRunner$NonWindowObservingProcessBundleContext.outputWithTimestamp(FnApiDoFnRunner.java:2357)
>>  
>> org.apache.beam.fn.harness.FnApiDoFnRunner$ProcessBundleContextBase.output(FnApiDoFnRunner.java:2527)
>>  
>> org.apache.beam.sdk.transforms.DoFnOutputReceivers$WindowedContextOutputReceiver.output(DoFnOutputReceivers.java:78)
>>  
>> org.apache.beam.sdk.io.Read$OutputSingleSource.processElement(Read.java:1018)
>>  
>> org.apache.beam.sdk.io.Read$OutputSingleSource$DoFnInvoker.invokeProcessElement(Unknown
>>  Source) 
>> org.apache.beam.fn.harness.FnApiDoFnRunner.processElementForParDo(FnApiDoFnRunner.java:800)
>>  
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:313)
>>  
>> org.apache.beam.fn.harness.data.PCollectionConsumerRegistry$MetricTrackingFnDataReceiver.accept(PCollectionConsumerRegistry.java:245)
>>  
>> org.apache.beam.fn.harness.BeamFnDataReadRunner.forwardElementToConsumer(BeamFnDataReadRunner.java:213)
>>  
>> org.apache.beam.sdk.fn.data.BeamFnDataInboundObserver2.multiplexElements(BeamFnDataInboundObserver2.java:158)
>>  
>> org.apache.beam.fn.harness.control.ProcessBundleHandler.processBundle(ProcessBundleHandler.java:533)
>>  
>> org.apache.beam.fn.harness.control.BeamFnControlClient.delegateOnInstructionRequestType(BeamFnControlClient.java:151)
>>  
>> org.apache.beam.fn.harness.control.BeamFnControlClient$InboundObserver.lambda$onNext$0(BeamFnControlClient.java:116)
>>  
>> java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
>>  java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264) 
>> org.apache.beam.sdk.util.UnboundedScheduledExecutorService$ScheduledFutureTask.run(UnboundedScheduledExecutorService.java:162)
>>  
>> java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
>>  
>> java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
>>  java.base/java.lang.Thread.run(Thread.java:829) Caused by: 
>> java.lang.RuntimeException: org.apache.kafka.common.KafkaException: Failed 
>> to construct kafka consumer 
>> org.apache.beam.sdk.io.kafka.KafkaUnbo

Re: How to run expansion service using go sdk in local development environment ?

2022-08-29 Thread Yu Watanabe
Hello Danny.

Ah . I see . Thank you for your advice.

Thanks,
Yu Watanabe

On Mon, Aug 29, 2022 at 9:26 AM Danny McCormick via user
 wrote:
>
> Hey Yu, as the error you posted suggests, the Go direct runner which you're 
> using in your local development environment doesn't support external 
> transforms using an expansion service. If you're going to do a x-lang 
> transform using an expansion service you should use a different runner like 
> Dataflow, Flink, Spark, or one of the other runners listed here - 
> https://beam.apache.org/documentation/runners/capability-matrix/
>
> Thanks,
> Danny
>
> On Sun, Aug 28, 2022 at 7:50 AM Yu Watanabe  wrote:
>>
>> Hello.
>>
>> I would like to ask a question about expansion service. I'm currently
>> testing my expansion service in my local development environment.
>> I have read notes about kafka in advance,
>>
>> https://github.com/apache/beam/blob/master/sdks/go/examples/kafka/taxi.go#L93
>>
>> I have prepared sdk containers .
>>
>> [ywatanabe@laptop-archlinux Development]$ docker image ls | grep apache
>> apache/beam_java8_sdk   2.42.0.dev
>> f7e9d38b01fe   11 days ago 643MB
>> apache/beam_go_sdk  latest
>> 8a87ea45255b   11 days ago 149MB
>>
>> However, when I run the code in my local environment, I get an error.
>>
>> [ywatanabe@laptop-archlinux go]$ go run ./examples/elasticsearch/sample.go \
>>   --runner direct \
>>   --sdk_harness_container_image_override
>> ".*java.*,apache/beam_java8_sdk:2.42.0.dev"
>> Hello world.
>> 2022/08/28 20:39:01 Executing pipeline with the direct runner.
>> 2022/08/28 20:39:01 Pipeline:
>> 2022/08/28 20:39:01 Nodes: {1: []uint8/bytes GLO}
>> {2: string/string GLO}
>> {3: []uint8/bytes GLO}
>> {4: []uint8/bytes GLO}
>> Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/bytes GLO}]
>> 2: ParDo [In(Main): []uint8 <- {1: []uint8/bytes GLO}] -> [Out: T ->
>> {2: string/string GLO}]
>> 3: External [In(Main): string <- {2: string/string GLO}] -> [Out:
>> []uint8 -> {3: []uint8/bytes GLO} Out: []uint8 -> {4: []uint8/bytes
>> GLO}]
>> Pipeline failed: translation failed
>> caused by:
>> external transforms like 3: External [In(Main): string <- {2:
>> string/string GLO}] -> [Out: []uint8 -> {3: []uint8/bytes GLO} Out:
>> []uint8 -> {4: []uint8/bytes GLO}] are not supported in the Go direct
>> runner, please execute your pipel[ywatanabe@laptop-archlinux go]$
>>
>> Am I missing something ?
>>
>> My main and io code can be found below.
>>
>> https://gist.github.com/yuwtennis/dec3bf3bfc0c4fa54d9d3565c98d008e
>>
>> Thanks,
>> Yu
>>
>>
>> --
>> Yu Watanabe
>>
>> linkedin: www.linkedin.com/in/yuwatanabe1/
>> twitter:   twitter.com/yuwtennis



-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


How to run expansion service using go sdk in local development environment ?

2022-08-28 Thread Yu Watanabe
Hello.

I would like to ask a question about expansion service. I'm currently
testing my expansion service in my local development environment.
I have read notes about kafka in advance,

https://github.com/apache/beam/blob/master/sdks/go/examples/kafka/taxi.go#L93

I have prepared sdk containers .

[ywatanabe@laptop-archlinux Development]$ docker image ls | grep apache
apache/beam_java8_sdk   2.42.0.dev
f7e9d38b01fe   11 days ago 643MB
apache/beam_go_sdk  latest
8a87ea45255b   11 days ago 149MB

However, when I run the code in my local environment, I get an error.

[ywatanabe@laptop-archlinux go]$ go run ./examples/elasticsearch/sample.go \
  --runner direct \
  --sdk_harness_container_image_override
".*java.*,apache/beam_java8_sdk:2.42.0.dev"
Hello world.
2022/08/28 20:39:01 Executing pipeline with the direct runner.
2022/08/28 20:39:01 Pipeline:
2022/08/28 20:39:01 Nodes: {1: []uint8/bytes GLO}
{2: string/string GLO}
{3: []uint8/bytes GLO}
{4: []uint8/bytes GLO}
Edges: 1: Impulse [] -> [Out: []uint8 -> {1: []uint8/bytes GLO}]
2: ParDo [In(Main): []uint8 <- {1: []uint8/bytes GLO}] -> [Out: T ->
{2: string/string GLO}]
3: External [In(Main): string <- {2: string/string GLO}] -> [Out:
[]uint8 -> {3: []uint8/bytes GLO} Out: []uint8 -> {4: []uint8/bytes
GLO}]
Pipeline failed: translation failed
caused by:
external transforms like 3: External [In(Main): string <- {2:
string/string GLO}] -> [Out: []uint8 -> {3: []uint8/bytes GLO} Out:
[]uint8 -> {4: []uint8/bytes GLO}] are not supported in the Go direct
runner, please execute your pipel[ywatanabe@laptop-archlinux go]$

Am I missing something ?

My main and io code can be found below.

https://gist.github.com/yuwtennis/dec3bf3bfc0c4fa54d9d3565c98d008e

Thanks,
Yu


-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


Re: How to register as external cross language transform ?

2022-08-17 Thread Yu Watanabe
Cham.

Thank you for the advice.
It worked ! I see it now !

beam:transform:org.apache.beam:elasticsearch_write:v1:
org.apache.beam.sdk.expansion.service.ExpansionService$ExternalTransformRegistrarLoader$1@5f049ea1

Thanks,
Yu

On Thu, Aug 18, 2022 at 6:28 AM Chamikara Jayalath via user
 wrote:
>
>
>
> On Wed, Aug 17, 2022 at 3:05 AM Yu Watanabe  wrote:
>>
>> Hello.
>>
>> I am trying to write code for cross language transform for
>> ElasticsearchIO but having trouble with it.
>> I would appreciate it if I could get help.
>>
>> As describe in doc and also referencing KafkaIO ,
>>
>> https://beam.apache.org/documentation/programming-guide/#1311-creating-cross-language-java-transforms
>>
>> I have annotated the code with @AutoService but the
>> 'elasticsearch_write' does not appear when starting Expansion Service.
>>
>> https://github.com/yuwtennis/beam/blob/feat/elasticsearch-io-cross-lang/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L1991
>>
>> Below is the snippet of command line operation.
>>
>> https://gist.github.com/yuwtennis/3c55ca45b31f9a302daf7b827a842ef6
>>
>> I have built jar files with the below command in advance .
>>
>> ./gradlew :sdks:java [jar]
>>
>> Would there be any other instructions I should be referencing ?
>
>
> It's because io-expansion-service currently does not include ElasticSearch 
> module as a dependency: 
> https://github.com/apache/beam/blob/master/sdks/java/io/expansion-service/build.gradle
> Can you try adding it there and re-building the jar ?
>
> Thanks,
> Cham
>
>
>>
>>
>> Best Regards,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>>
>> linkedin: www.linkedin.com/in/yuwatanabe1/
>> twitter:   twitter.com/yuwtennis



-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


How to register as external cross language transform ?

2022-08-17 Thread Yu Watanabe
Hello.

I am trying to write code for cross language transform for
ElasticsearchIO but having trouble with it.
I would appreciate it if I could get help.

As describe in doc and also referencing KafkaIO ,

https://beam.apache.org/documentation/programming-guide/#1311-creating-cross-language-java-transforms

I have annotated the code with @AutoService but the
'elasticsearch_write' does not appear when starting Expansion Service.

https://github.com/yuwtennis/beam/blob/feat/elasticsearch-io-cross-lang/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java#L1991

Below is the snippet of command line operation.

https://gist.github.com/yuwtennis/3c55ca45b31f9a302daf7b827a842ef6

I have built jar files with the below command in advance .

./gradlew :sdks:java [jar]

Would there be any other instructions I should be referencing ?

Best Regards,
Yu Watanabe

-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


Re: Any guideline for building golang connector ?

2022-07-08 Thread Yu Watanabe
Hello Danny.

Thank you for the details. I appreciate your message.

I am a newbie around building io . So I will look into the links and first
build my knowledge.

Thanks,
Yu Watanabe

On Fri, Jul 8, 2022 at 8:28 PM Danny McCormick via user <
user@beam.apache.org> wrote:

> Hey Yu,
>
> The guidance on that page should generally apply for Go as well, though we
> are missing an example transform; I filed
> https://github.com/apache/beam/issues/22194 to fix this, but a couple
> examples are our textio implementation
> <https://github.com/apache/beam/blob/master/sdks/go/pkg/beam/io/textio/textio.go>
> and this native streaming example
> <https://github.com/apache/beam/blob/master/sdks/go/examples/native_wordcap/nativepubsubio/native.go>
> (non-productionized, but it shows a lot of good concepts for handling
> streaming sources).
>
> Another option would be to use Java's elasticsearch implementation
> <https://github.com/apache/beam/blob/master/sdks/java/io/elasticsearch/src/main/java/org/apache/beam/sdk/io/elasticsearch/ElasticsearchIO.java>
> with a cross language transform. If you're thinking of contributing this
> back to the main beam code base, I'd probably recommend that approach. In
> general, we're currently investing most heavily in cross language
> transforms because it's a much lower burden to build/support since it
> reuses the main components of the original transform. There are several
> examples of wrapped cross-language transforms in
> https://github.com/apache/beam/tree/master/sdks/go/pkg/beam/io/xlang. If
> you have specific questions about authoring IOs that aren't answered in the
> docs/by example, feel free to drop them in this thread as well!
>
> Thanks,
> Danny
>
> On Fri, Jul 8, 2022 at 3:51 AM Yu Watanabe  wrote:
>
>> Hello .
>>
>> Is there any guideline for building a go sdk connector ?
>> I was reviewing the document but I could not find one for golang.
>>
>> https://beam.apache.org/documentation/io/developing-io-overview/
>>
>> I was thinking of building one for elasticsearch.
>>
>> Thanks,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>>
>> linkedin: www.linkedin.com/in/yuwatanabe1/
>> twitter:   twitter.com/yuwtennis
>>
>>
>

-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


Any guideline for building golang connector ?

2022-07-08 Thread Yu Watanabe
Hello .

Is there any guideline for building a go sdk connector ?
I was reviewing the document but I could not find one for golang.

https://beam.apache.org/documentation/io/developing-io-overview/

I was thinking of building one for elasticsearch.

Thanks,
Yu Watanabe

-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


Re: Is FileIO for Azure Blob Storage GA?

2021-12-21 Thread Yu Watanabe
Hello Brian.

Thank you for the follow up.

Ikegami-san.

As a reference,  I will also put PR for the python implementation which was
released in 2.25.0  .

https://issues.apache.org/jira/browse/BEAM-6807
https://github.com/apache/beam/pull/12492

However, it is still not on the io FileSystem IO list so not sure with the
release state though.

https://beam.apache.org/documentation/io/built-in/

Thanks,
Yu


On Wed, Dec 22, 2021 at 9:38 AM Brian Hulette  wrote:

> The package is marked experimental [1], which is how we indicate a feature
> isn't GA and may change. That being said, a lot of important Beam features
> are still marked experimental but are actually quite solid and stable, so
> there's some nuance here.
>
> @Pablo Estrada  may be able to comment more since he
> was involved with the initial contribution, but he may be out for the
> holidays for a little while.
>
> [1]
> https://beam.apache.org/releases/javadoc/2.34.0/org/apache/beam/sdk/io/azure/blobstore/package-summary.html
>
> On Tue, Dec 21, 2021 at 4:47 AM Yu Watanabe  wrote:
>
>> Hello .
>>
>> Looks like unit tests are passed in these PRs
>> .
>> https://issues.apache.org/jira/browse/BEAM-10378
>>
>> But the integration test with the WordCount example has not passed yet.
>>
>> https://github.com/apache/beam/pull/12649
>>
>> SInce AzureBlobStoreFileSystemRegistrar class is available in 2.34.0 ,
>> you could test in your prototype environment.
>>
>>
>> https://beam.apache.org/releases/javadoc/2.34.0/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemRegistrar.html
>>
>> Thanks,
>> Yu
>>
>>
>>
>> Official doc FileSystem class for azure blob storage is not listed in the
>> official doc.
>> https://beam.apache.org/documentation/io/built-in/
>>
>>
>>
>>
>>
>> On Tue, Dec 21, 2021 at 10:50 AM 池上有希乃 
>> wrote:
>>
>>> Hi
>>>
>>> I think I will use Apache Beam for my business.
>>> So there is a question.
>>> Whether FileIO for Azure Blob Storage is GA phase or not?
>>>
>>> Thanks,
>>> Yukino Ikegami
>>>
>>
>>
>> --
>> Yu Watanabe
>>
>> linkedin: www.linkedin.com/in/yuwatanabe1/
>> twitter:   twitter.com/yuwtennis
>>
>>
>

-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


Re: Is FileIO for Azure Blob Storage GA?

2021-12-21 Thread Yu Watanabe
Hello .

Looks like unit tests are passed in these PRs
.
https://issues.apache.org/jira/browse/BEAM-10378

But the integration test with the WordCount example has not passed yet.

https://github.com/apache/beam/pull/12649

SInce AzureBlobStoreFileSystemRegistrar class is available in 2.34.0 , you
could test in your prototype environment.

https://beam.apache.org/releases/javadoc/2.34.0/org/apache/beam/sdk/io/azure/blobstore/AzureBlobStoreFileSystemRegistrar.html

Thanks,
Yu



Official doc FileSystem class for azure blob storage is not listed in the
official doc.
https://beam.apache.org/documentation/io/built-in/





On Tue, Dec 21, 2021 at 10:50 AM 池上有希乃  wrote:

> Hi
>
> I think I will use Apache Beam for my business.
> So there is a question.
> Whether FileIO for Azure Blob Storage is GA phase or not?
>
> Thanks,
> Yukino Ikegami
>


-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


Re: java.io.InvalidClassException with Spark 3.1.2

2021-08-22 Thread Yu Watanabe
Sure. I starred your repository.

On Sat, Aug 21, 2021 at 11:27 AM cw  wrote:

> Hello Yu,
>i done lot of testing, it only work for spark 2+, not 3. if you need a
> working example on kubernetes,
> https://github.com/cometta/python-apache-beam-spark , feel free to
> improve the code, if you would like to contribute. help me *star if if it
> is useful for you. thank you
>
> On Monday, August 16, 2021, 12:37:46 AM GMT+8, Yu Watanabe <
> yu.w.ten...@gmail.com> wrote:
>
>
> Hello .
>
> I would like to ask question for spark runner.
>
> Using spark downloaded from below link,
>
>
> https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
>
> I get below error when submitting a pipeline.
> Full error is on
> https://gist.github.com/yuwtennis/7b0c1dc0dcf98297af1e3179852ca693.
>
>
> --
> 21/08/16 01:10:26 WARN TransportChannelHandler: Exception in connection
> from /192.168.11.2:35601
> java.io.InvalidClassException:
> scala.collection.mutable.WrappedArray$ofRef; local class incompatible:
> stream classdesc serialVersionUID = 3456489343829468865, local class
> serialVersionUID = 1028182004549731694
> at
> java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:689)
> ...
>
> --
>
> SDK Harness and Job service are deployed as below.
>
> 1. SDK Harness
>
> sudo docker run --net=host apache/beam_spark3_job_server:2.31.0
> --spark-master-url=spark://localhost:7077 --clean-artifacts-per-job true
>
> 2. Job service
>
> sudo docker run --net=host apache/beam_python3.8_sdk:2.31.0 --worker_pool
>
> * apache/beam_spark_job_server:2.31.0 for spark 2.4.8
>
> 3. SDK client code
>
> https://gist.github.com/yuwtennis/2e4c13c79f71e8f713e947955115b3e2
>
> Spark 2.4.8 succeeded without any errors using above components.
>
>
> https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
>
> Would there be any setting which you need to be aware of for spark 3.1.2 ?
>
> Thanks,
> Yu Watanabe
>
> --
> Yu Watanabe
>
> linkedin: www.linkedin.com/in/yuwatanabe1/
> twitter:   twitter.com/yuwtennis
>
>


-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


Re: java.io.InvalidClassException with Spark 3.1.2

2021-08-19 Thread Yu Watanabe
Kyle.

Thank you.

On Tue, Aug 17, 2021 at 5:55 AM Kyle Weaver  wrote:

> I was able to reproduce the error. I'm not sure why this would happen,
> since as far as I can tell the Beam 2.31.0 Spark runner should be using
> Spark 3.1.2 and Scala 2.12 [1]. I filed a JIRA issue for it. [2]
>
> [1]
> https://github.com/apache/beam/pull/14897/commits/b6fca2bb79d9e7a69044b477460445456720ec58
> [2] https://issues.apache.org/jira/browse/BEAM-12762
>
>
> On Sun, Aug 15, 2021 at 9:37 AM Yu Watanabe  wrote:
>
>> Hello .
>>
>> I would like to ask question for spark runner.
>>
>> Using spark downloaded from below link,
>>
>>
>> https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz
>>
>> I get below error when submitting a pipeline.
>> Full error is on
>> https://gist.github.com/yuwtennis/7b0c1dc0dcf98297af1e3179852ca693.
>>
>>
>> --
>> 21/08/16 01:10:26 WARN TransportChannelHandler: Exception in connection
>> from /192.168.11.2:35601
>> java.io.InvalidClassException:
>> scala.collection.mutable.WrappedArray$ofRef; local class incompatible:
>> stream classdesc serialVersionUID = 3456489343829468865, local class
>> serialVersionUID = 1028182004549731694
>> at
>> java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:689)
>> ...
>>
>> --
>>
>> SDK Harness and Job service are deployed as below.
>>
>> 1. SDK Harness
>>
>> sudo docker run --net=host apache/beam_spark3_job_server:2.31.0
>> --spark-master-url=spark://localhost:7077 --clean-artifacts-per-job true
>>
>> 2. Job service
>>
>> sudo docker run --net=host apache/beam_python3.8_sdk:2.31.0 --worker_pool
>>
>> * apache/beam_spark_job_server:2.31.0 for spark 2.4.8
>>
>> 3. SDK client code
>>
>> https://gist.github.com/yuwtennis/2e4c13c79f71e8f713e947955115b3e2
>>
>> Spark 2.4.8 succeeded without any errors using above components.
>>
>>
>> https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz
>>
>> Would there be any setting which you need to be aware of for spark 3.1.2 ?
>>
>> Thanks,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>>
>> linkedin: www.linkedin.com/in/yuwatanabe1/
>> twitter:   twitter.com/yuwtennis
>>
>>
>

-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


java.io.InvalidClassException with Spark 3.1.2

2021-08-15 Thread Yu Watanabe
Hello .

I would like to ask question for spark runner.

Using spark downloaded from below link,

https://www.apache.org/dyn/closer.lua/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

I get below error when submitting a pipeline.
Full error is on
https://gist.github.com/yuwtennis/7b0c1dc0dcf98297af1e3179852ca693.

--
21/08/16 01:10:26 WARN TransportChannelHandler: Exception in connection
from /192.168.11.2:35601
java.io.InvalidClassException: scala.collection.mutable.WrappedArray$ofRef;
local class incompatible: stream classdesc serialVersionUID =
3456489343829468865, local class serialVersionUID = 1028182004549731694
at
java.base/java.io.ObjectStreamClass.initNonProxy(ObjectStreamClass.java:689)
...
--

SDK Harness and Job service are deployed as below.

1. SDK Harness

sudo docker run --net=host apache/beam_spark3_job_server:2.31.0
--spark-master-url=spark://localhost:7077 --clean-artifacts-per-job true

2. Job service

sudo docker run --net=host apache/beam_python3.8_sdk:2.31.0 --worker_pool

* apache/beam_spark_job_server:2.31.0 for spark 2.4.8

3. SDK client code

https://gist.github.com/yuwtennis/2e4c13c79f71e8f713e947955115b3e2

Spark 2.4.8 succeeded without any errors using above components.

https://archive.apache.org/dist/spark/spark-2.4.8/spark-2.4.8-bin-hadoop2.7.tgz

Would there be any setting which you need to be aware of for spark 3.1.2 ?

Thanks,
Yu Watanabe

-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


Re: Submit Python Beam on Spark Dataproc

2021-08-15 Thread Yu Watanabe
Hello Mahan.

Sorry for the late reply.

> Still waiting for startup of environment from localhost:5 for worker
id 1-1

>From the message , it seems that something is wrong with connection between
Worker node in spark cluster and SDK harness.

According to this slide runner worker (in your context spark worker) ,
should also have connectivity with sdk harness container.

https://docs.google.com/presentation/d/1Cso0XP9dmj77OD9Bd53C1M3W1sPJF0ZnA20gzb2BPhE/edit#slide=id.g42e4c9aad6_1_0

Could you please also try setting ssh tunneling to spark worker node as
well ?

Thanks,
Yu

On Thu, Aug 12, 2021 at 9:07 PM Mahan Hosseinzadeh 
wrote:

> Thanks Yu for the help and the tips.
>
> I ran the following steps but my job is stuck and can't get submitted to
> Dataproc and I keep getting this message in job-server:
> Still waiting for startup of environment from localhost:5 for worker
> id 1-1
>
>
> -
> *Beam code:*
> pipeline_options = PipelineOptions([
> "--runner=PortableRunner",
> "--job_endpoint=localhost:8099",
> "--environment_type=EXTERNAL",
> "--environment_config=localhost:5"
> ])
>
> -
> *Job Server:*
> I couldn't use Docker because host networking doesn't work on Mac OS and I
> used Gradle instead
>
> ./gradlew :runners:spark:3:job-server:runShadow
>
> -
> *Beam Worker Pool:*
> docker run -p=5:5 apache/beam_python3.7_sdk --worker_pool
>
> -
> *SSH tunnel to the master node:*
> gcloud compute ssh  \
> --project  \
> --zone   \
> -- -NL 7077:localhost:7077
>
> -----
>
> Thanks,
> Mahan
>
> On Tue, Aug 10, 2021 at 3:53 PM Yu Watanabe  wrote:
>
>> Hello .
>>
>> Would this page help ? I hope it helps.
>>
>> https://beam.apache.org/documentation/runners/spark/
>>
>> > Running on a pre-deployed Spark cluster
>>
>> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is
>> 7077 the master url port?
>> * Yes.
>>
>> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
>> * Job server should be able to communicate with Spark master node port
>> 7077. So I believe it is Yes.
>>
>> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK
>> Harness Configuration?
>> * This is the configuration of how you want  your harness container to
>> spin up.
>>
>> https://beam.apache.org/documentation/runtime/sdk-harness-config/
>>
>> For DOCKER , you will need docker deployed on all spark worker nodes.
>> > User code is executed within a container started on each worker node
>>
>> I used EXTERNAL when I did it with flink cluster before.
>>
>> e.g
>>
>> https://github.com/yuwtennis/apache-beam/blob/master/flink-session-cluster/docker/samples/src/sample.py#L14
>>
>> 4- Should we run the job-server outside of the Dataproc cluster or should
>> we run it in the master node?
>> * Depends. It could be inside or outside the master node. But if you are
>> connecting to full managed service, then outside might be better.
>>
>> https://beam.apache.org/documentation/runners/spark/
>>
>> > Start JobService that will connect with the Spark master
>>
>> Thanks,
>> Yu
>>
>> On Tue, Aug 10, 2021 at 7:53 PM Mahan Hosseinzadeh 
>> wrote:
>>
>>> Hi,
>>>
>>> I have a Python Beam job that works on Dataflow but we would like to
>>> submit it on a Spark Dataproc cluster with no Flink involvement.
>>> I already spent days but failed to figure out how to use PortableRunner
>>> with the beam_spark_job_server to submit my Python Beam job to Spark
>>> Dataproc. All the Beam docs are about Flink and there is no guideline about
>>> Spark with Dataproc.
>>> Some relevant questions might be:
>>> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is
>>> 7077 the master url port?
>>> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
>>> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK
>>> Harness Configuration?
>>> 4- Should we run the job-server outside of the Dataproc cluster or
>>> should we run it in the master node?
>>>
>>> Thanks,
>>> Mahan
>>>
>>
>>
>> --
>> Yu Watanabe
>>
>> linkedin: www.linkedin.com/in/yuwatanabe1/
>> twitter:   twitter.com/yuwtennis
>>
>>
>

-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


Re: Submit Python Beam on Spark Dataproc

2021-08-10 Thread Yu Watanabe
Hello .

Would this page help ? I hope it helps.

https://beam.apache.org/documentation/runners/spark/

> Running on a pre-deployed Spark cluster

1- What's spark-master-url in case of a remote cluster on Dataproc? Is 7077
the master url port?
* Yes.

2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
* Job server should be able to communicate with Spark master node port
7077. So I believe it is Yes.

3- What's the environment_type? Can we use DOCKER? Then what's the SDK
Harness Configuration?
* This is the configuration of how you want  your harness container to spin
up.

https://beam.apache.org/documentation/runtime/sdk-harness-config/

For DOCKER , you will need docker deployed on all spark worker nodes.
> User code is executed within a container started on each worker node

I used EXTERNAL when I did it with flink cluster before.

e.g
https://github.com/yuwtennis/apache-beam/blob/master/flink-session-cluster/docker/samples/src/sample.py#L14

4- Should we run the job-server outside of the Dataproc cluster or should
we run it in the master node?
* Depends. It could be inside or outside the master node. But if you are
connecting to full managed service, then outside might be better.

https://beam.apache.org/documentation/runners/spark/

> Start JobService that will connect with the Spark master

Thanks,
Yu

On Tue, Aug 10, 2021 at 7:53 PM Mahan Hosseinzadeh 
wrote:

> Hi,
>
> I have a Python Beam job that works on Dataflow but we would like to
> submit it on a Spark Dataproc cluster with no Flink involvement.
> I already spent days but failed to figure out how to use PortableRunner
> with the beam_spark_job_server to submit my Python Beam job to Spark
> Dataproc. All the Beam docs are about Flink and there is no guideline about
> Spark with Dataproc.
> Some relevant questions might be:
> 1- What's spark-master-url in case of a remote cluster on Dataproc? Is
> 7077 the master url port?
> 2- Should we ssh tunnel to sparkMasterUrl port using gcloud compute ssh?
> 3- What's the environment_type? Can we use DOCKER? Then what's the SDK
> Harness Configuration?
> 4- Should we run the job-server outside of the Dataproc cluster or should
> we run it in the master node?
>
> Thanks,
> Mahan
>


-- 
Yu Watanabe

linkedin: www.linkedin.com/in/yuwatanabe1/
twitter:   twitter.com/yuwtennis


Re: Running Beam on Flink

2020-02-16 Thread Yu Watanabe
Hello .

>From this line in the log, may I confirm whether you set up harness container ?

I have executed your code in below environment and it worked. Python
version is newer though.

OS: Fedora 31
BEAM: 2.19.0
PYTHON: 3.7.6

Compared to my log , I realized below line does not exist in your log.


357 [grpc-default-executor-1] INFO
org.apache.beam.runners.fnexecution.control.FnApiControlClientPoolService
- Beam Fn Control client connected with id 1-1
358 [grpc-default-executor-1] INFO
org.apache.beam.runners.fnexecution.data.GrpcDataService - Beam Fn
Data client connected.


One thing I realized was in LOOPBACK mode for harness container , job
server uses hostname for when communicating with flink.
___
Caused by: java.net.UnknownHostException: fedora-desktop: Name or
service not known
at java.net.Inet6AddressImpl.lookupAllHostAddr(Native Method)
at java.net.InetAddress$2.lookupAllHostAddr(InetAddress.java:928)
at 
java.net.InetAddress.getAddressesFromNameService(InetAddress.java:1323)
at java.net.InetAddress.getLocalHost(InetAddress.java:1500)
... 34 more
___

Perhaps temporarily adding your hostname to "hosts file" by resolving
with 127.0.0.1 and see if it works ?

Thanks,
Yu Watanabe


On Mon, Feb 10, 2020 at 3:29 AM Xander Song  wrote:
>
> Hi Jincheng,
>
> Thanks for your help. Yes, I am using Mac. Your suggestion allowed me to 
> submit the job on port 8099. However, I am now encountering a different error 
> message.
>
> ERROR:root:java.net.ConnectException: Connection refused
>
> Traceback (most recent call last):
>
>   File "test_beam_local_flink.py", line 18, in 
>
> | apache_beam.Map(lambda x: print(x))
>
>   File 
> "/Users/xander/Projects/flink-test/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>  line 481, in __exit__
>
> self.run().wait_until_finish()
>
>   File 
> "/Users/xander/Projects/flink-test/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>  line 461, in run
>
> self._options).run(False)
>
>   File 
> "/Users/xander/Projects/flink-test/env/lib/python3.7/site-packages/apache_beam/pipeline.py",
>  line 474, in run
>
> return self.runner.run_pipeline(self, self._options)
>
>   File 
> "/Users/xander/Projects/flink-test/env/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>  line 334, in run_pipeline
>
> result.wait_until_finish()
>
>   File 
> "/Users/xander/Projects/flink-test/env/lib/python3.7/site-packages/apache_beam/runners/portability/portable_runner.py",
>  line 455, in wait_until_finish
>
> self._job_id, self._state, self._last_error_message()))
>
> RuntimeError: Pipeline 
> BeamApp-root-0209182021-fc6ec677_c351e1bb-d428-4475-9910-4544d8ec1de4 failed 
> in state FAILED: java.net.ConnectException: Connection refused
>
>
> I've attached a text file containing the output from the terminal running the 
> Docker container in case that is informative (it is quite lengthy). Any 
> suggestions are appreciated.
>
> Best,
> Xander
>
> On Sun, Feb 9, 2020 at 2:44 AM jincheng sun  wrote:
>>
>>
>> Hi Xander,
>>
>> Are you using Mac? The option --net=host doesn't work on Mac[1]. Could you 
>> try to see if the command
>> `docker run -p 8099:8099 -p 8098:8098 -p 8097:8097 
>> apachebeam/flink1.9_job_server:latest` works?
>>
>> [1] https://forums.docker.com/t/should-docker-run-net-host-work/14215/28
>>
>> Best,
>> Jincheng
>> -
>> Twitter: https://twitter.com/sunjincheng121
>> -
>>
>>
>> Xander Song  于2020年2月9日周日 上午11:52写道:
>>>
>>> Do you have any suggestions for addressing this issue? I am unsure of what 
>>> to try next.
>>>
>>> On Fri, Feb 7, 2020 at 5:55 PM Ankur Goenka  wrote:
>>>>
>>>> That seems to be a problem.
>>>>
>>>> When I try the command, I get
>>>>
>>>> $ telnet localhost 8099
>>>> Trying ::1...
>>>> Connected to localhost.
>>>> Escape character is '^]'.
>>>> �^CConnection closed by foreign host.
>>>>
>>>> On Fri, Feb 7, 2020 at 5:34 PM Xander Song  wrote:
>>>>>
>>>>> Thanks for the response. After entering telnet localhost 8099, I receive
>>>>>
>>>>>

Re: Assertion error when using kafka module in python

2020-01-25 Thread Yu Watanabe
Hello.

I have some progress with kafka source output on python but still not
able to get "WriteToKafka" working.
I appreciate if I could get help.

I was able to pass the assertion error following example for External Transform.
Defining ParDo instead Map before "WriteToKafka", I was able to use
the correct Urn.

https://github.com/apache/beam/blob/819a242453eac9a2b3d79df0b259c8cc844b6af1/sdks/python/apache_beam/examples/wordcount_xlang.py#L70

However, I get coder casting error on jobserver.

==
jobserver_1| java.lang.ClassCastException:
org.apache.beam.sdk.coders.ByteArrayCoder cannot be cast to
org.apache.beam.sdk.coders.KvCoder
 at
org.apache.beam.sdk.io.kafka.KafkaIO$Write.expand(KafkaIO.java:1552)
==

In KafkaIO.java, as error says, it tries to cast the input coder to KVCoder.
==
KvCoder kvCoder = (KvCoder) input.getCoder();
==

In my code, following the example from ExternalTransform, I have
passed bytes object to External Transform.
==
class ToKV(beam.DoFn):

def process(self, elem):

return bytes(elem.encode())

..

res = ( lines | beam.ParDo(ToKV()).with_output_types(bytes)
  | WriteToKafka(
  { 'acks': 'all', 'bootstrap.servers': 'localhost:9092'},
  'beam',
  'org.apache.kafka.common.serialization.ByteArraySerializer',
  'org.apache.kafka.common.serialization.ByteArraySerializer',
  'localhost:8097' ))
==

So my question is which coder should I use in order to cast into
KVCoder on java side ?

Thanks,
Yu

On Thu, Jan 9, 2020 at 7:17 AM Chamikara Jayalath  wrote:
>
> Hmm, seems like a Java (external) ParDo is being forwarded to Python SDK for 
> execution somehow. +Maximilian Michels might know more.
>
> On Sun, Jan 5, 2020 at 2:57 AM Yu Watanabe  wrote:
>>
>> Hello.
>>
>> I would like to use sinking data into kafka using kafka module for
>> python , however,
>> getting below assertion  error when pipeline is executed.
>>
>> --
>> beam_1 | Traceback (most recent call last):
>> beam_1 |   File "src/sample.py", line 50, in 
>> beam_1 | main()
>> beam_1 |   File "src/sample.py", line 46, in main
>> beam_1 | 'localhost:8097' ))
>> beam_1 |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
>> 427, in __exit__
>> beam_1 | self.run().wait_until_finish()
>> beam_1 |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
>> 407, in run
>> beam_1 | self._options).run(False)
>> beam_1 |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
>> 680, in from_runner_api
>> beam_1 | context.transforms.get_by_id(root_transform_id)]
>> beam_1 |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 94, in get_by_id
>> beam_1 | self._id_to_proto[id], self._pipeline_context)
>> beam_1 |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
>> 913, in from_runner_api
>> beam_1 | part = context.transforms.get_by_id(transform_id)
>> beam_1 |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 94, in get_by_id
>> beam_1 | self._id_to_proto[id], self._pipeline_context)
>> beam_1 |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
>> 913, in from_runner_api
>> beam_1 | part = context.transforms.get_by_id(transform_id)
>> beam_1 |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/runners/pipeline_context.py",
>> line 94, in get_by_id
>> beam_1 | self._id_to_proto[id], self._pipeline_context)
>> beam_1 |   File
>> "/usr/local/lib/python3.7/site-packages/apache_beam/pipeline.py", line
>> 913, in from_runner_api
>> beam_1 | part = context.transforms.get_by_id(transform_id)
>> beam_1 |   File
>>

Assertion error when using kafka module in python

2020-01-05 Thread Yu Watanabe
  | WriteToKafka(
  { 'acks': 'all', 'bootstrap.servers': 'localhost:9092'},
  'beam',

'org.apache.kafka.common.serialization.ByteArraySerializer',

'org.apache.kafka.common.serialization.ByteArraySerializer',
  'localhost:8097' ))
--

I have my platform built upon docker engine and used below combination
of modules.

1. apache-beam: 2.16.0
2. flink: 1.8
3. python-sdk(37): compiled using release-2.16.0
4. jobserver: compiled using release-2.16.0
5. kafka: 2.3.x (using confluent 5.3.x)

* docker compose
https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker/docker-compose.yml

Would there be any setting I am missing to avoid the error ?
I appreciate if I could get assistance for solving the error.

Best Regards,
Yu Watanabe

-- 
Yu Watanabe
yu.w.ten...@gmail.com


Re: Worker pool dies with error: context deadline exceeded

2020-01-05 Thread Yu Watanabe
Hello Kyle.

Thank you for the reply  and happy new year for you too !

Yes. I got it working by sharing all container network with host kernel .

My docker compose is below.
https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker/docker-compose.yml

I confirmed it by writing data to file on gcs.



with beam.Pipeline(options=options) as p:

lines = ( p | beam.Create(['Hello World.', 'Apache beam']) )

# Write to GCS
( lines | WriteToText('gs://{}/sample.txt'.format(project_id))
)



https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker/samples/src/sample.py

RESULT

$ gsutil cat gs://${PROJECT_ID}/sample.txt-0-of-1
Hello World.
Apache beam


Thanks,
Yu Watanabe


On Fri, Jan 3, 2020 at 5:58 AM Kyle Weaver  wrote:

> This is the root cause:
>
> > python-sdk_1   | 2019/12/31 02:59:45 Failed to obtain provisioning
> > information: failed to dial server at localhost:45759
>
> The Flink task manager and Beam SDK harness use connections over
> `localhost` to communicate.
>
> You will have to put `taskmanager` and `python-sdk` on the same host.
> Maybe you can try using `--networking=host` so they will share the
> namespace. https://docs.docker.com/network/host/
>
> Happy new year!
>
> Kyle
>
> On Mon, Dec 30, 2019 at 7:21 PM Yu Watanabe  wrote:
>
>> Hello .
>>
>> I would like to ask question about the error I am facing with worker
>> pool of sdk container.
>> I get below error when I run the pipeline.
>>
>> 
>> python-sdk_1   | 2019/12/31 02:57:26 Starting worker pool 1: python -m
>> apache_beam.runners.worker.worker_pool_main --service_port=5
>> --container_executable=/opt/apache/beam/boot
>> python-sdk_1   | INFO:root:Started worker pool servicer at port:
>> localhost:5 with executable: /opt/apache/beam/boot
>> python-sdk_1   | WARNING:root:Starting worker with command
>> ['/opt/apache/beam/boot', '--id=1-1',
>> '--logging_endpoint=localhost:35615',
>> '--artifact_endpoint=localhost:42723',
>> '--provision_endpoint=localhost:45759',
>> '--control_endpoint=localhost:43185']
>> python-sdk_1   | 2019/12/31 02:57:45 Initializing python harness:
>> /opt/apache/beam/boot --id=1-1 --logging_endpoint=localhost:35615
>> --artifact_endpoint=localhost:42723
>> --provision_endpoint=localhost:45759
>> --control_endpoint=localhost:43185
>> python-sdk_1   | 2019/12/31 02:59:45 Failed to obtain provisioning
>> information: failed to dial server at localhost:45759
>> python-sdk_1   | caused by:
>> python-sdk_1   | context deadline exceeded
>>
>> 
>>
>> In flink taskmanager's log ,  it keeps waiting for response from sdk
>> container.
>>
>> 
>> taskmanager_1  | 2019-12-31 02:57:45,445 INFO
>> org.apache.flink.configuration.GlobalConfiguration-
>> Loading configuration property: query.server.port, 6125
>> taskmanager_1  | 2019-12-31 02:58:26,678 INFO
>> org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory
>>  - Still waiting for startup of environment from python-sdk:5 for
>> worker id 1-1
>>
>> 
>>
>> Looking at flink  jobmanager's log, error is logged after starting map
>> transform.
>> So looks like, request from taskmanager is reached to sdk conatiner
>> but not processed correctly.
>> Sounds like I am missing some setting for sdk container..
>>
>> 
>> jobmanager_1   | 2019-12-31 02:57:44,987 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph-
>> DataSource (Impulse) (1/1) (4e6f68fd31bafa066b740943bc3ea736) switched
>> from RUNNING to FINISHED.
>> jobmanager_1   | 2019-12-31 02:57:44,989 INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph-
>> MapPartition (MapPartition at [2]Create/{FlatMap(> core.py:2468>), Map(decode)}) (1/1) (067ed452ebd15c1175ecde0ae40e8ac7)
>

Re: Beam on Flink with Python SDK and using GCS as artifacts directory

2019-12-31 Thread Yu Watanabe
Matthew

> Just to verify that the preferred python version is python3?

Harness container supports both python 2 and 3.

https://beam.apache.org/documentation/runtime/environments/

In my opinion, considering that python2's EOL is Jan 1, 2020 , python 3
would be the choice.

Thanks,
Yu Watanabe

On Tue, Dec 24, 2019 at 8:24 AM Matthew Rafael Magsombol <
raffy4...@gmail.com> wrote:

> So I'm able to put this to debug mode within the flink jobmanager and
> taskmanager and at the moment, the taskmanager is complaining about these:
> ``
>
> 2019-12-23 22:43:06,892 DEBUG
> org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.NativeLibraryLoader
> - Unable to load the library
> 'org_apache_beam_vendor_grpc_v1p21p0_netty_transport_native_epoll_x86_64',
> trying other loading mechanism.
>
> java.lang.UnsatisfiedLinkError: no
> org_apache_beam_vendor_grpc_v1p21p0_netty_transport_native_epoll_x86_64 in
> java.library.path
>
> at java.lang.ClassLoader.loadLibrary(ClassLoader.java:1860)
>
> at java.lang.Runtime.loadLibrary0(Runtime.java:870)
>
> at java.lang.System.loadLibrary(System.java:1122)
>
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.NativeLibraryUtil.loadLibrary(NativeLibraryUtil.java:38)
>
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>
> at
> sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
>
> at
> sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>
> at java.lang.reflect.Method.invoke(Method.java:498)
>
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.NativeLibraryLoader$1.run(NativeLibraryLoader.java:369)
>
> at java.security.AccessController.doPrivileged(Native Method)
>
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.NativeLibraryLoader.loadLibraryByHelper(NativeLibraryLoader.java:361)
>
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.NativeLibraryLoader.loadLibrary(NativeLibraryLoader.java:339)
>
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.netty.util.internal.NativeLibraryLoader.load(NativeLibraryLoader.java:136)
>
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.epoll.Native.loadNativeLibrary(Native.java:198)
>
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.epoll.Native.(Native.java:61)
>
> at
> org.apache.beam.vendor.grpc.v1p21p0.io.netty.channel.epoll.Epoll.(Epoll.java:38)
>
> at java.lang.Class.forName0(Native Method)
>
> at java.lang.Class.forName(Class.java:264)
>
> ``
>
>
> And the other error is this:
>
> ``
>
> 2019/12/23 22:43:09 Executing: python -m
> apache_beam.runners.worker.sdk_worker_main
>
> 2019/12/23 22:43:09 Python exited: exec: "python": executable file not
> found in $PATH
>
> ``
>
> This is more towards complaining that the exec command for the sdk is
> complaining about python not existing ( my current environment that I'm
> running this on, in both the job manager server and the python code itself
> are both inside the jobmanager...I have the flink jobmanager container and
> the flink taskmanager with base image as flink version 1.8.3 )
>
> Just to verify that the preferred python version is python3?
>
> So re-pointing `python` to python3 was a success!
>
> I did run into another problem where I sink the transformed data into a
> local file but I think I'll create a separate thread on that. In
> production, I don't really want to sink into a local file but rather into
> some distributed storage, so this is a non-concern atm.
>
>
> My last question is...do I need to worry about that java package issue?
>
> If not, then that's it for now, thanks!
>
>
> On Mon, Dec 23, 2019 at 7:55 AM Kyle Weaver  wrote:
>
>> > It will be great if you can get the error from the failing process.
>>
>> Note that you will have to set the log level to DEBUG to get output from
>> the process.
>>
>> On Fri, Dec 20, 2019 at 6:23 PM Ankur Goenka  wrote:
>>
>>> Hi Matthew,
>>>
>>> It will be great if you can get the error from the failing process.
>>> My suspicions are:
>>> - Flink task manager container access to gcs location.
>>> - As you are running Flink task manager in a container
>>> "/beam_src_code/sdks/python/container/build/target/launcher/linux_amd64/boot\"
>>> path is not applicable.
>>> - Boot command is not run from a valid activated python environment.
>>> - You can use
>>> https://github.com/apache/beam/blob/605c59b383a77b117bb6b07021e8c41cb13b438f/sdks/python/test-suites/portable/py2/build.gradle#L179
&g

Worker pool dies with error: context deadline exceeded

2019-12-30 Thread Yu Watanabe
Hello .

I would like to ask question about the error I am facing with worker
pool of sdk container.
I get below error when I run the pipeline.

python-sdk_1   | 2019/12/31 02:57:26 Starting worker pool 1: python -m
apache_beam.runners.worker.worker_pool_main --service_port=5
--container_executable=/opt/apache/beam/boot
python-sdk_1   | INFO:root:Started worker pool servicer at port:
localhost:5 with executable: /opt/apache/beam/boot
python-sdk_1   | WARNING:root:Starting worker with command
['/opt/apache/beam/boot', '--id=1-1',
'--logging_endpoint=localhost:35615',
'--artifact_endpoint=localhost:42723',
'--provision_endpoint=localhost:45759',
'--control_endpoint=localhost:43185']
python-sdk_1   | 2019/12/31 02:57:45 Initializing python harness:
/opt/apache/beam/boot --id=1-1 --logging_endpoint=localhost:35615
--artifact_endpoint=localhost:42723
--provision_endpoint=localhost:45759
--control_endpoint=localhost:43185
python-sdk_1   | 2019/12/31 02:59:45 Failed to obtain provisioning
information: failed to dial server at localhost:45759
python-sdk_1   | caused by:
python-sdk_1   | context deadline exceeded


In flink taskmanager's log ,  it keeps waiting for response from sdk container.

taskmanager_1  | 2019-12-31 02:57:45,445 INFO
org.apache.flink.configuration.GlobalConfiguration-
Loading configuration property: query.server.port, 6125
taskmanager_1  | 2019-12-31 02:58:26,678 INFO
org.apache.beam.runners.fnexecution.environment.ExternalEnvironmentFactory
 - Still waiting for startup of environment from python-sdk:5 for
worker id 1-1


Looking at flink  jobmanager's log, error is logged after starting map
transform.
So looks like, request from taskmanager is reached to sdk conatiner
but not processed correctly.
Sounds like I am missing some setting for sdk container..

jobmanager_1   | 2019-12-31 02:57:44,987 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
DataSource (Impulse) (1/1) (4e6f68fd31bafa066b740943bc3ea736) switched
from RUNNING to FINISHED.
jobmanager_1   | 2019-12-31 02:57:44,989 INFO
org.apache.flink.runtime.executiongraph.ExecutionGraph-
MapPartition (MapPartition at [2]Create/{FlatMap(), Map(decode)}) (1/1) (067ed452ebd15c1175ecde0ae40e8ac7)
switched from DEPLOYING to RUNNING.


Command line for building sdk container is

./gradlew :sdks:python:container:py37:dockerPush
-Pdocker-repository-root=${GCR_HOSTNAME}/${PROJECT_ID}
-Pdocker-tag=release-2.16.0


My docker compose
https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker/docker-compose.yml

My pipeline code
https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker/samples/src/sample.py

Would there be any settings I need to use for starting up sdk container ?

Best Regards,
Yu Watanabe

-- 
Yu Watanabe
yu.w.ten...@gmail.com


Re: Protocol message had invalid UTF-8

2019-12-30 Thread Yu Watanabe
Kyle.

Thank you for the reply.

Error had disappeared after creating sdk and job-server with correct apache
beam branch (2.16.0 in this case).

Thanks,
Yu Watanabe

On Tue, Dec 31, 2019 at 2:30 AM Kyle Weaver  wrote:

> This error can happen when the job server and sdk versions are mismatched
> (due to protobuf incompatibilities). The sdk and job server containers
> should use the same beam version.
>
> On Mon, Dec 30, 2019 at 11:47 AM Yu Watanabe 
> wrote:
>
>> Hello.
>>
>> I would like to get help with issue having in job-server.
>>
>> I have set up flink (session cluster) and job server everything in
>> docker , however, jobserver seems to reject requests from the beam
>> client as described in below error.
>>
>> ==
>> jobserver_1| [grpc-default-executor-11] WARN
>> org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService -
>> Encountered Unexpected Exception during validation
>> jobserver_1| java.lang.RuntimeException: Failed to validate
>> transform ref_AppliedPTransform_Create/FlatMap(> core.py:2468>)_4
>> jobserver_1| at
>>
>> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:203)
>> jobserver_1| at
>>
>> org.apache.beam.runners.core.construction.graph.PipelineValidator.validateComponents(PipelineValidator.java:112)
>> 
>> jobserver_1| Caused by:
>>
>> org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException:
>> Protocol message had invalid UTF-8.
>> jobserver_1| at
>>
>> org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException.invalidUtf8(InvalidProtocolBufferException.java:141)
>> jobserver_1| at
>>
>> org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Utf8$DecodeUtil.handleTwoBytes(Utf8.java:1909)
>> jobserver_1| at
>>
>> org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Utf8$DecodeUtil.access$700(Utf8.java:1883)
>> 
>> ==
>>
>> Below is description about my environment.
>>
>> 1. Combination of versions of each containers are,
>>
>> apache beam: 2.16.0
>> sdk: python3.7_sdk: 2.19.0.dev
>> flink: flink1.9_job_server:latest
>>
>> 2. Set up containers using beam repository.
>>
>> https://github.com/yuwtennis/beam-deployment/wiki/Setting-up-flink-runner
>>
>> 3. Set up docker-compose as below.
>>
>>
>> https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker-compose.yml
>>
>> 4. Kept source code very simple.
>>
>>
>> https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/samples/src/sample.py
>>
>> Do I need any other options in job server to properly
>> serialize/deserialize the incoming message from beam client ?
>>
>> My current options are below.
>> ==
>> command: [ "--artifacts-dir", "${ARTIFACTS_DIR}", "--flink-master-url",
>> "${FLINK_MASTER_URL}", "--job-host", "${JOB_HOST}", "--job-port",
>> "${JOB_PORT}" ]
>> ==
>>
>> I appreciate if I could get some help.
>>
>> Thanks,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>> yu.w.ten...@gmail.com
>>
>

-- 
Yu Watanabe
yu.w.ten...@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>


Protocol message had invalid UTF-8

2019-12-30 Thread Yu Watanabe
Hello.

I would like to get help with issue having in job-server.

I have set up flink (session cluster) and job server everything in
docker , however, jobserver seems to reject requests from the beam
client as described in below error.

==
jobserver_1| [grpc-default-executor-11] WARN
org.apache.beam.runners.fnexecution.jobsubmission.InMemoryJobService -
Encountered Unexpected Exception during validation
jobserver_1| java.lang.RuntimeException: Failed to validate
transform ref_AppliedPTransform_Create/FlatMap()_4
jobserver_1| at
org.apache.beam.runners.core.construction.graph.PipelineValidator.validateTransform(PipelineValidator.java:203)
jobserver_1| at
org.apache.beam.runners.core.construction.graph.PipelineValidator.validateComponents(PipelineValidator.java:112)

jobserver_1| Caused by:
org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException:
Protocol message had invalid UTF-8.
jobserver_1| at
org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.InvalidProtocolBufferException.invalidUtf8(InvalidProtocolBufferException.java:141)
jobserver_1| at
org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Utf8$DecodeUtil.handleTwoBytes(Utf8.java:1909)
jobserver_1| at
org.apache.beam.vendor.grpc.v1p21p0.com.google.protobuf.Utf8$DecodeUtil.access$700(Utf8.java:1883)

==

Below is description about my environment.

1. Combination of versions of each containers are,

apache beam: 2.16.0
sdk: python3.7_sdk: 2.19.0.dev
flink: flink1.9_job_server:latest

2. Set up containers using beam repository.

https://github.com/yuwtennis/beam-deployment/wiki/Setting-up-flink-runner

3. Set up docker-compose as below.

https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/docker-compose.yml

4. Kept source code very simple.

https://github.com/yuwtennis/beam-deployment/blob/master/flink-session-cluster/samples/src/sample.py

Do I need any other options in job server to properly
serialize/deserialize the incoming message from beam client ?

My current options are below.
==
command: [ "--artifacts-dir", "${ARTIFACTS_DIR}", "--flink-master-url",
"${FLINK_MASTER_URL}", "--job-host", "${JOB_HOST}", "--job-port",
"${JOB_PORT}" ]
==========

I appreciate if I could get some help.

Thanks,
Yu Watanabe

-- 
Yu Watanabe
yu.w.ten...@gmail.com


Re: How to import external module inside ParDo using Apache Flink ?

2019-10-07 Thread Yu Watanabe
I did not answer to Kyle's question. Sorry about that.

> Did you try moving the imports from the process function to the top of
main.py?

  Yes . Consequently, I moved all "imports" to at top of each files unless
I needed to exclusively  import inside the function.

On Mon, Oct 7, 2019 at 4:49 PM Yu Watanabe  wrote:

> Thank you for the comment.
>
> I finally got this working. I would like to share my experience for people
> whom are beginner with portable runner.
> What I done was below items when calling functions and classes from
> external package.
>
> 1. As Kyle said, I needed 'save_main_session' for sys path to persist
> after pickling.
>
> 2. I needed to push all related files to worker nodes using
> "extra_package" option to resolve dependency.
> https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/
>
> 3. I needed to write import syntax in clear fashion otherwise I got below
> error in task manager.
> Looks like external packages is pushed in to
> "/usr/local/lib/python3.5/site-packages" and requires PKG.MODULENAME format
> to work it out.
>
> 
> import utils
>
> ...
>
>   File "/usr/local/lib/python3.5/site-packages/modules/beam_pardo.py",
> line 18, in 
> import utils
> ImportError: No module named 'utils'
>
> 
>
> Below is my import syntax for external package in main.py. Other files
> also follow below syntax.
>
> 
> #
> # Local application/library specific imports
> #
> import pkg_aif.utils as ut
> from pkg_aif.beam_pardo import VerifyFn
> from pkg_aif.beam_pardo import FlattenTagFilesFn
> from pkg_aif.beam_states import StatefulBufferingFn
> from pkg_aif.pipeline_wrapper import pipelineWrapper
> from pkg_aif.frames import Frames
> from pkg_aif.tag_counts import TagCounts
> from pkg_aif.tags import Tags
> from pkg_aif.es_credentials import EsCredentials
> from pkg_aif.s3_credentials import S3Credentials
>
> 
>
> Below are related information to above.
>
> Full options for PipelineOptions.
>
> 
> options = PipelineOptions([
>   "--runner=PortableRunner",
>   "--environment_config={0}".format(DOCKER_REGISTRY),
>   "--environment_type=DOCKER",
>   "--experiments=beam_fn_api",
>   "--parallelism={0}".format(PARALLELISM),
>   "--job_endpoint=localhost:8099",
>   "--extra_package=PATH_TO_SDIST"
>   ])
> options.view_as(SetupOptions).save_main_session = True
>
> return beam.Pipeline(options=options)
>
> 
>
> My setup.py is below.
>
> 
> import setuptools
>
> REQUIRED_PACKAGES = [
> 'apache-beam==2.15.0',
> 'elasticsearch>=7.0.0,<8.0.0',
> 'urllib3',
> 'boto3'
> ]
>
> setuptools.setup(
>author   = 'Yu Watanabe',
>author_email = 'AUTHOR_EMAIL',
>url  = 'URL',
>name = 'quality_validation',
>version  = '0.1',
>install_requires = REQUIRED_PACKAGES,
>packages = setuptools.find_packages(),
>
> )
>
> 
>
> Directory path to setup.py.
>
> 
> admin@ip-172-31-9-89:~/quality-validation-distribute/bin$ ls -l
> total 20
> drwxr-xr-x 2 admin admin 4096 Oct  2 19:30 dist
> -rw-r--r-- 1 admin admin0 Sep  5 21:21 __init__.py
> -rw-r--r-- 1 admin admin 3782 Oct  3 11:02 main.py
> drwxr-xr-x 3 admin admin 4096 Oct  3 15:41 pkg_aif
> drwxr-xr-x 2 admin admin 4096 Oct  2 19:30 quality_validation.egg-info
> -rw-r--r-- 1 admin admin  517 Oct  1 15:21 setup.py
>
> 
>
> Thanks,
> Yu Watanabe
>
> On Fri

Re: How to import external module inside ParDo using Apache Flink ?

2019-10-07 Thread Yu Watanabe
Thank you for the comment.

I finally got this working. I would like to share my experience for people
whom are beginner with portable runner.
What I done was below items when calling functions and classes from
external package.

1. As Kyle said, I needed 'save_main_session' for sys path to persist after
pickling.

2. I needed to push all related files to worker nodes using "extra_package"
option to resolve dependency.
https://beam.apache.org/documentation/sdks/python-pipeline-dependencies/

3. I needed to write import syntax in clear fashion otherwise I got below
error in task manager.
Looks like external packages is pushed in to
"/usr/local/lib/python3.5/site-packages" and requires PKG.MODULENAME format
to work it out.

import utils

...

  File "/usr/local/lib/python3.5/site-packages/modules/beam_pardo.py", line
18, in 
import utils
ImportError: No module named 'utils'


Below is my import syntax for external package in main.py. Other files also
follow below syntax.

#
# Local application/library specific imports
#
import pkg_aif.utils as ut
from pkg_aif.beam_pardo import VerifyFn
from pkg_aif.beam_pardo import FlattenTagFilesFn
from pkg_aif.beam_states import StatefulBufferingFn
from pkg_aif.pipeline_wrapper import pipelineWrapper
from pkg_aif.frames import Frames
from pkg_aif.tag_counts import TagCounts
from pkg_aif.tags import Tags
from pkg_aif.es_credentials import EsCredentials
from pkg_aif.s3_credentials import S3Credentials


Below are related information to above.

Full options for PipelineOptions.

options = PipelineOptions([
  "--runner=PortableRunner",
  "--environment_config={0}".format(DOCKER_REGISTRY),
  "--environment_type=DOCKER",
  "--experiments=beam_fn_api",
  "--parallelism={0}".format(PARALLELISM),
  "--job_endpoint=localhost:8099",
  "--extra_package=PATH_TO_SDIST"
  ])
options.view_as(SetupOptions).save_main_session = True

return beam.Pipeline(options=options)


My setup.py is below.

import setuptools

REQUIRED_PACKAGES = [
'apache-beam==2.15.0',
'elasticsearch>=7.0.0,<8.0.0',
'urllib3',
'boto3'
]

setuptools.setup(
   author   = 'Yu Watanabe',
   author_email = 'AUTHOR_EMAIL',
   url  = 'URL',
   name = 'quality_validation',
   version  = '0.1',
   install_requires = REQUIRED_PACKAGES,
   packages = setuptools.find_packages(),

)


Directory path to setup.py.

admin@ip-172-31-9-89:~/quality-validation-distribute/bin$ ls -l
total 20
drwxr-xr-x 2 admin admin 4096 Oct  2 19:30 dist
-rw-r--r-- 1 admin admin0 Sep  5 21:21 __init__.py
-rw-r--r-- 1 admin admin 3782 Oct  3 11:02 main.py
drwxr-xr-x 3 admin admin 4096 Oct  3 15:41 pkg_aif
drwxr-xr-x 2 admin admin 4096 Oct  2 19:30 quality_validation.egg-info
-rw-r--r-- 1 admin admin  517 Oct  1 15:21 setup.py
====

Thanks,
Yu Watanabe

On Fri, Sep 27, 2019 at 3:23 AM Kyle Weaver  wrote:

> Did you try moving the imports from the process function to the top of
> main.py?
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>
>
> On Wed, Sep 25, 2019 at 11:27 PM Yu Watanabe 
> wrote:
>
>> Hello.
>>
>> I would like to ask for help with resolving dependency issue for imported
>> module.
>>
>> I have a directory structure as below and I am trying to import Frames
>> class from frames.py to main.py.
>> =
>> quality-validation/
>> bin/setup.py
>>   main.py
>>   modules/
>> frames.py
>>
>> =
>>
>> However, when I run pipeline, I get below error at TaskMana

Re: How do you call user defined ParDo class from the pipeline in Portable Runner (Apache Flink) ?

2019-09-25 Thread Yu Watanabe
Actually there was a good example in the latest wordcount.py in master repo.

https://github.com/apache/beam/blob/master/sdks/python/apache_beam/examples/wordcount.py


On Thu, Sep 26, 2019 at 12:00 PM Yu Watanabe  wrote:

> Thank you for the help.
>
> I have chosen to remove the super().__init__() .
>
> Thanks,
> Yu
>
> On Thu, Sep 26, 2019 at 9:18 AM Ankur Goenka  wrote:
>
>> super has some issues wile pickling in python3. Please refer
>> https://github.com/uqfoundation/dill/issues/300 for more details.
>>
>> Removing reference to super in your dofn should help.
>>
>> On Wed, Sep 25, 2019 at 5:13 PM Yu Watanabe 
>> wrote:
>>
>>> Thank you for the reply.
>>>
>>> " save_main_session" did not work, however, situation had changed.
>>>
>>> 1. get_all_options() output. "save_main_session" set to True.
>>>
>>> =
>>> 2019-09-26 09:04:11,586 DEBUG Pipeline Options:
>>> {'wait_until_finish_duration': None, 'update': False, 'min_cpu_platform':
>>> None, 'dataflow_endpoint': 'https://dataflow.googleapis.com',
>>> 'environment_config': 'asia.gcr.io/creationline001/beam/python3:latest',
>>> 'machine_type': None, 'enable_streaming_engine': False, 'sdk_location':
>>> 'default', 'profile_memory': False, 'max_num_workers': None,
>>> 'type_check_strictness': 'DEFAULT_TO_ANY', 'streaming': False,
>>> 'setup_file': None, 'network': None, 'on_success_matcher': None,
>>> 'requirements_cache': None, 'service_account_email': None,
>>> 'environment_type': 'DOCKER', 'disk_type': None, 'labels': None,
>>> 'profile_location': None, 'direct_runner_use_stacked_bundle': True,
>>> 'use_public_ips': None, * 'save_main_session': True, ***
>>> 'direct_num_workers': 1, 'num_workers': None,
>>> 'worker_harness_container_image': None, 'template_location': None,
>>> 'hdfs_port': None, 'flexrs_goal': None, 'profile_cpu': False,
>>> 'transform_name_mapping': None, 'profile_sample_rate': 1.0, 'runner':
>>> 'PortableRunner', 'project': None, 'dataflow_kms_key': None,
>>> 'job_endpoint': 'localhost:8099', 'extra_packages': None,
>>> 'environment_cache_millis': 0, 'dry_run': False, 'autoscaling_algorithm':
>>> None, 'staging_location': None, 'job_name': None, 'no_auth': False,
>>> 'runtime_type_check': False, 'direct_runner_bundle_repeat': 0,
>>> 'subnetwork': None, 'pipeline_type_check': True, 'hdfs_user': None,
>>> 'dataflow_job_file': None, 'temp_location': None, 'sdk_worker_parallelism':
>>> 0, 'zone': None, 'experiments': ['beam_fn_api'], 'hdfs_host': None,
>>> 'disk_size_gb': None, 'dataflow_worker_jar': None, 'requirements_file':
>>> None, 'beam_plugins': None, 'pubsubRootUrl': None, 'region': None}
>>>
>>>  
>>> =
>>>
>>> 2. Error in Task Manager log did not change.
>>>
>>> ==
>>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474,
>>> in find_class
>>> return StockUnpickler.find_class(self, module, name)
>>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on >> 'apache_beam.runners.worker.sdk_worker_main' from
>>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
>>>
>>> ==
>>>
>>> 3. However, if I comment out "super().__init__()" in my code , error
>>> changes.
>>>
>>> ==
>>>   File
>>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
>>> line 1078, in _create_pardo_operation
>>> dofn_data = pickler.loads(serialized_fn)
>>>   File
>>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>>> line 265, in loads
>>> return dill.loads(s)
>>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317,
>>> in loads
>>> return load(file, ignore)
>>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305,
>>> in load
>>> obj = pik.load()
>>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill

Re: How do you call user defined ParDo class from the pipeline in Portable Runner (Apache Flink) ?

2019-09-25 Thread Yu Watanabe
Thank you for the help.

I have chosen to remove the super().__init__() .

Thanks,
Yu

On Thu, Sep 26, 2019 at 9:18 AM Ankur Goenka  wrote:

> super has some issues wile pickling in python3. Please refer
> https://github.com/uqfoundation/dill/issues/300 for more details.
>
> Removing reference to super in your dofn should help.
>
> On Wed, Sep 25, 2019 at 5:13 PM Yu Watanabe  wrote:
>
>> Thank you for the reply.
>>
>> " save_main_session" did not work, however, situation had changed.
>>
>> 1. get_all_options() output. "save_main_session" set to True.
>>
>> =
>> 2019-09-26 09:04:11,586 DEBUG Pipeline Options:
>> {'wait_until_finish_duration': None, 'update': False, 'min_cpu_platform':
>> None, 'dataflow_endpoint': 'https://dataflow.googleapis.com',
>> 'environment_config': 'asia.gcr.io/creationline001/beam/python3:latest',
>> 'machine_type': None, 'enable_streaming_engine': False, 'sdk_location':
>> 'default', 'profile_memory': False, 'max_num_workers': None,
>> 'type_check_strictness': 'DEFAULT_TO_ANY', 'streaming': False,
>> 'setup_file': None, 'network': None, 'on_success_matcher': None,
>> 'requirements_cache': None, 'service_account_email': None,
>> 'environment_type': 'DOCKER', 'disk_type': None, 'labels': None,
>> 'profile_location': None, 'direct_runner_use_stacked_bundle': True,
>> 'use_public_ips': None, * 'save_main_session': True, ***
>> 'direct_num_workers': 1, 'num_workers': None,
>> 'worker_harness_container_image': None, 'template_location': None,
>> 'hdfs_port': None, 'flexrs_goal': None, 'profile_cpu': False,
>> 'transform_name_mapping': None, 'profile_sample_rate': 1.0, 'runner':
>> 'PortableRunner', 'project': None, 'dataflow_kms_key': None,
>> 'job_endpoint': 'localhost:8099', 'extra_packages': None,
>> 'environment_cache_millis': 0, 'dry_run': False, 'autoscaling_algorithm':
>> None, 'staging_location': None, 'job_name': None, 'no_auth': False,
>> 'runtime_type_check': False, 'direct_runner_bundle_repeat': 0,
>> 'subnetwork': None, 'pipeline_type_check': True, 'hdfs_user': None,
>> 'dataflow_job_file': None, 'temp_location': None, 'sdk_worker_parallelism':
>> 0, 'zone': None, 'experiments': ['beam_fn_api'], 'hdfs_host': None,
>> 'disk_size_gb': None, 'dataflow_worker_jar': None, 'requirements_file':
>> None, 'beam_plugins': None, 'pubsubRootUrl': None, 'region': None}
>>
>>  
>> =
>>
>> 2. Error in Task Manager log did not change.
>>
>> ==
>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474,
>> in find_class
>> return StockUnpickler.find_class(self, module, name)
>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on > 'apache_beam.runners.worker.sdk_worker_main' from
>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
>>
>> ==
>>
>> 3. However, if I comment out "super().__init__()" in my code , error
>> changes.
>>
>> ==
>>   File
>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 1078, in _create_pardo_operation
>> dofn_data = pickler.loads(serialized_fn)
>>   File
>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>> line 265, in loads
>> return dill.loads(s)
>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317,
>> in loads
>> return load(file, ignore)
>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305,
>> in load
>> obj = pik.load()
>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474,
>> in find_class
>> return StockUnpickler.find_class(self, module, name)
>> ImportError: No module named 's3_credentials'
>> ==
>>
>>
>> 4. My whole class is below.
>>
>> ==
>> class FlattenTagFilesFn(beam.DoFn):
>> def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>> self.s3Bucket = s3Bucket
>> se

Re: How do you call user defined ParDo class from the pipeline in Portable Runner (Apache Flink) ?

2019-09-25 Thread Yu Watanabe
yaml2['head']['operator_id'], # Post OperatorId
  preFrm[f],# Pre Frame Line
  postFrm[f],   # Post Frame Line
  info['pre'][0][1],# Pre Last Modified
Time
  info['post'][0][1])   # Post Last
Modified Time

yield (frames)

tagCounts = TagCounts(
 self.s3Bucket,
 yaml1,   # Pre Yaml
 yaml2,   # Post Yaml
 info['pre'][0][0],   # Pre S3Key
 info['post'][0][0],  # Post S3Key
 info['pre'][0][1],   # Pre Last Modified Time
 info['post'][0][1] ) # Post Last Modified Time

yield beam.pvalue.TaggedOutput('counts', tagCounts)
==

I was using super() to define single instance of boto instance in ParDo.
May I ask, is there a way to call super() in the constructor of ParDo ?

Thanks,
Yu


On Thu, Sep 26, 2019 at 7:49 AM Kyle Weaver  wrote:

> You will need to set the save_main_session pipeline option to True.
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>
>
> On Wed, Sep 25, 2019 at 3:44 PM Yu Watanabe  wrote:
>
>> Hello.
>>
>> I would like to ask question for ParDo .
>>
>> I am getting below error inside TaskManager when running code on Apache
>> Flink using Portable Runner.
>> =
>> 
>>   File
>> "/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
>> line 1078, in _create_pardo_operation
>> dofn_data = pickler.loads(serialized_fn)
>>   File
>> "/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
>> line 265, in loads
>> return dill.loads(s)
>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317,
>> in loads
>> return load(file, ignore)
>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305,
>> in load
>> obj = pik.load()
>>   File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474,
>> in find_class
>> return StockUnpickler.find_class(self, module, name)
>> AttributeError: Can't get attribute 'FlattenTagFilesFn' on > 'apache_beam.runners.worker.sdk_worker_main' from
>> '/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/sdk_worker_main.py'>
>> =
>>
>> " FlattenTagFilesFn" is defined as ParDo and called from Pipeline as
>> below.
>> =
>> frames, counts = ({'pre': pcollPre, 'post': pcollPost}
>>   | 'combined:cogroup' >> beam.CoGroupByKey()
>>   | 'combined:exclude' >> beam.Filter(lambda x:
>> (len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0))
>>   | 'combined:flat'>>
>> beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds))
>>   .with_outputs('counts',
>> main='frames'))
>> =
>>
>> In the same file I have defined the class as below.
>> =========
>> class FlattenTagFilesFn(beam.DoFn):
>> def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
>> self.s3Bucket = s3Bucket
>> self.s3Creds  = s3Creds
>> self.maxKeys  = maxKeys
>> =
>>
>> This is not a problem when running  pipeline using DirectRunner.
>> May I ask , how should I import class for ParDo when running on Flink ?
>>
>> Thanks,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>> Weekend Freelancer who loves to challenge building data platform
>> yu.w.ten...@gmail.com
>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>> Twitter icon] <https://twitter.com/yuwtennis>
>>
>

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>


How do you call user defined ParDo class from the pipeline in Portable Runner (Apache Flink) ?

2019-09-25 Thread Yu Watanabe
Hello.

I would like to ask question for ParDo .

I am getting below error inside TaskManager when running code on Apache
Flink using Portable Runner.
=

  File
"/usr/local/lib/python3.5/site-packages/apache_beam/runners/worker/bundle_processor.py",
line 1078, in _create_pardo_operation
dofn_data = pickler.loads(serialized_fn)
  File
"/usr/local/lib/python3.5/site-packages/apache_beam/internal/pickler.py",
line 265, in loads
return dill.loads(s)
  File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 317, in
loads
return load(file, ignore)
  File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 305, in
load
obj = pik.load()
  File "/usr/local/lib/python3.5/site-packages/dill/_dill.py", line 474, in
find_class
return StockUnpickler.find_class(self, module, name)
AttributeError: Can't get attribute 'FlattenTagFilesFn' on 
=

" FlattenTagFilesFn" is defined as ParDo and called from Pipeline as below.
=
frames, counts = ({'pre': pcollPre, 'post': pcollPost}
  | 'combined:cogroup' >> beam.CoGroupByKey()
  | 'combined:exclude' >> beam.Filter(lambda x:
(len(x[1]['pre']) > 0) and (len(x[1]['post']) > 0))
  | 'combined:flat'>>
beam.ParDo(FlattenTagFilesFn(s3Bucket, s3Creds))
  .with_outputs('counts',
main='frames'))
=

In the same file I have defined the class as below.
=
class FlattenTagFilesFn(beam.DoFn):
def __init__(self, s3Bucket, s3Creds, maxKeys=1000):
self.s3Bucket = s3Bucket
self.s3Creds  = s3Creds
self.maxKeys  = maxKeys
=

This is not a problem when running  pipeline using DirectRunner.
May I ask , how should I import class for ParDo when running on Flink ?

Thanks,
Yu Watanabe

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>


Re: How to reference manifest from apache flink worker node ?

2019-09-24 Thread Yu Watanabe
I needed little adjustment with the pipeline option to work it out.

Pipeline option required 'job_endpoint'  when using manual start up of job
server using jar file.
=
options = PipelineOptions([
  "--runner=PortableRunner",
  "--environment_config=
asia.gcr.io/creationline001/beam/python3:latest",
  "--environment_type=DOCKER",
  "--experiments=beam_fn_api",
  "--job_endpoint=localhost:8099"
  ])
 =

Otherwise, runner spins up job server container.
==
ERROR:root:Starting job service with ['docker', 'run', '-v',
'/usr/bin/docker:/bin/docker', '-v',
'/var/run/docker.sock:/var/run/docker.sock', '--network=host', '
admin-docker-apache.bintray.io/beam/flink-job-server:latest', '--job-host',
'localhost', '--job-port', '42915', '--artifact-port', '56561',
'--expansion-port', '53857']
ERROR:root:Error bringing up job service
==

Without "job_endpoint" option default job server is used.
==
https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/portable_runner.py#L176
  server = self.default_job_server(options)

=>
https://github.com/apache/beam/blob/d963aeb91a63f165b5ff1ebf6add8275aec204f1/sdks/python/apache_beam/runners/portability/job_server.py#L172
"-docker-apache.bintray.io/beam/flink-job-server:latest"
 ==

Thanks,
Yu

On Tue, Sep 24, 2019 at 2:06 PM Ankur Goenka  wrote:

> Flink job server does not have artifacts-dir option yet.
> We have a PR to add it https://github.com/apache/beam/pull/9648
>
> However, for now you can do a few manual steps to achieve this.
>
> Start Job server.
>
> 1. Download
> https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.8-job-server/2.15.0/beam-runners-flink-1.8-job-server-2.15.0.jar
>
> 2. Start the job server
> java -jar
> /home/admin/.apache_beam/cache/beam-runners-flink-1.8-job-server-2.15.0.jar
> --flink-master-url ip-172-31-12-113.ap-northeast-1.compute.internal:8081
> --artifacts-dir /home/ec2-user --job-port 8099
>
> 3. Submit your pipeline
> options = PipelineOptions([
>   "--runner=PortableRunner",
>   "--environment_config=
> asia.gcr.io/creationline001/beam/python3:latest",
>   "--environment_type=DOCKER",
>   "--experiments=beam_fn_api"
>   ])
>
> On Mon, Sep 23, 2019 at 8:20 PM Yu Watanabe  wrote:
>
>> Kyle.
>>
>> Thank you for the assistance.
>>
>> Looks like "--artifacts-dir" is not parsed. Below is the DEBUG log of the
>> pipline.
>> 
>> WARNING:root:Discarding unparseable args: ['--flink_version=1.8',
>> '--flink_master_url=ip-172-31-12-113.ap-northeast-1.compute.internal:8081',
>> '--artifacts-dir=/home/ec2-user']
>> ...
>> WARNING:root:Downloading job server jar from
>> https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.8-job-server/2.15.0/beam-runners-flink-1.8-job-server-2.15.0.jar
>> DEBUG:root:Starting job service with ['java', '-jar',
>> '/home/admin/.apache_beam/cache/beam-runners-flink-1.8-job-server-2.15.0.jar',
>> '--flink-master-url',
>> 'ip-172-31-12-113.ap-northeast-1.compute.internal:8081', '--artifacts-dir',
>> '/tmp/artifactsicq2kj8c', '--job-port', 41757, '--artifact-port', 0,
>> '--expansion-port', 0]
>> DEBUG:root:Waiting for jobs grpc channel to be ready at localhost:41757.
>> ...
>> DEBUG:root:Runner option 'job_endpoint' was already added
>> DEBUG:root:Runner option 'sdk_worker_parallelism' was already added
>> WARNING:root:Discarding unparseable args: ['--artifacts-dir=/home/admin']
>> ...
>> 
>>
>> My pipeline option .
>> 
>> options = PipelineOptions([
>>   "--runner=FlinkRunner",
>>   "--flink_version=1.8",
>>
>> "--flink_master_url=ip-172-31-12-113.ap-northeast-1.compute.internal:8081"
>> ,
>>   "--environment_config=
>> asia.gcr.io/creationli

Re: How to reference manifest from apache flink worker node ?

2019-09-24 Thread Yu Watanabe
Ankur.

Thank you for the comment.

Manual start up with job server looks more solid.
Setting TMPDIR then using Flink Runner sounded bit hacky .

Thanks,
Yu

On Tue, Sep 24, 2019 at 2:06 PM Ankur Goenka  wrote:

> Flink job server does not have artifacts-dir option yet.
> We have a PR to add it https://github.com/apache/beam/pull/9648
>
> However, for now you can do a few manual steps to achieve this.
>
> Start Job server.
>
> 1. Download
> https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.8-job-server/2.15.0/beam-runners-flink-1.8-job-server-2.15.0.jar
>
> 2. Start the job server
> java -jar
> /home/admin/.apache_beam/cache/beam-runners-flink-1.8-job-server-2.15.0.jar
> --flink-master-url ip-172-31-12-113.ap-northeast-1.compute.internal:8081
> --artifacts-dir /home/ec2-user --job-port 8099
>
> 3. Submit your pipeline
> options = PipelineOptions([
>   "--runner=PortableRunner",
>   "--environment_config=
> asia.gcr.io/creationline001/beam/python3:latest",
>   "--environment_type=DOCKER",
>       "--experiments=beam_fn_api"
>   ])
>
> On Mon, Sep 23, 2019 at 8:20 PM Yu Watanabe  wrote:
>
>> Kyle.
>>
>> Thank you for the assistance.
>>
>> Looks like "--artifacts-dir" is not parsed. Below is the DEBUG log of the
>> pipline.
>> 
>> WARNING:root:Discarding unparseable args: ['--flink_version=1.8',
>> '--flink_master_url=ip-172-31-12-113.ap-northeast-1.compute.internal:8081',
>> '--artifacts-dir=/home/ec2-user']
>> ...
>> WARNING:root:Downloading job server jar from
>> https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.8-job-server/2.15.0/beam-runners-flink-1.8-job-server-2.15.0.jar
>> DEBUG:root:Starting job service with ['java', '-jar',
>> '/home/admin/.apache_beam/cache/beam-runners-flink-1.8-job-server-2.15.0.jar',
>> '--flink-master-url',
>> 'ip-172-31-12-113.ap-northeast-1.compute.internal:8081', '--artifacts-dir',
>> '/tmp/artifactsicq2kj8c', '--job-port', 41757, '--artifact-port', 0,
>> '--expansion-port', 0]
>> DEBUG:root:Waiting for jobs grpc channel to be ready at localhost:41757.
>> ...
>> DEBUG:root:Runner option 'job_endpoint' was already added
>> DEBUG:root:Runner option 'sdk_worker_parallelism' was already added
>> WARNING:root:Discarding unparseable args: ['--artifacts-dir=/home/admin']
>> ...
>> 
>>
>> My pipeline option .
>> 
>> options = PipelineOptions([
>>   "--runner=FlinkRunner",
>>   "--flink_version=1.8",
>>
>> "--flink_master_url=ip-172-31-12-113.ap-northeast-1.compute.internal:8081"
>> ,
>>   "--environment_config=
>> asia.gcr.io/creationline001/beam/python3:latest",
>>   "--environment_type=DOCKER",
>>   "--experiments=beam_fn_api",
>>   "--artifacts-dir=/home/admin"
>>   ])
>> 
>>
>> Tracing from the log , looks like artifacts dir respects the default
>> tempdir of OS.
>> Thus, to adjust it I will need environment variable instead. I used
>> 'TMPDIR' in my case.
>> 
>>
>> https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/job_server.py#L203
>> artifacts_dir = self.local_temp_dir(prefix='artifacts')
>>
>> =>
>>
>> https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/job_server.py#L157
>> return tempfile.mkdtemp(dir=self._local_temp_root, **kwargs)
>>
>> =>
>>
>> https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/job_server.py#L102
>> self._local_temp_root = None
>> 
>>
>> Then it worked.
>> 
>> (python-bm-2150) admin@ip-172-31-9-89:~$ env | grep TMPDIR
>> TMPDIR=/home/admin
>>
>> =>
>> DEBUG:root:Sta

Re: How to reference manifest from apache flink worker node ?

2019-09-23 Thread Yu Watanabe
Kyle.

Thank you for the assistance.

Looks like "--artifacts-dir" is not parsed. Below is the DEBUG log of the
pipline.

WARNING:root:Discarding unparseable args: ['--flink_version=1.8',
'--flink_master_url=ip-172-31-12-113.ap-northeast-1.compute.internal:8081',
'--artifacts-dir=/home/ec2-user']
...
WARNING:root:Downloading job server jar from
https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.8-job-server/2.15.0/beam-runners-flink-1.8-job-server-2.15.0.jar
DEBUG:root:Starting job service with ['java', '-jar',
'/home/admin/.apache_beam/cache/beam-runners-flink-1.8-job-server-2.15.0.jar',
'--flink-master-url',
'ip-172-31-12-113.ap-northeast-1.compute.internal:8081', '--artifacts-dir',
'/tmp/artifactsicq2kj8c', '--job-port', 41757, '--artifact-port', 0,
'--expansion-port', 0]
DEBUG:root:Waiting for jobs grpc channel to be ready at localhost:41757.
...
DEBUG:root:Runner option 'job_endpoint' was already added
DEBUG:root:Runner option 'sdk_worker_parallelism' was already added
WARNING:root:Discarding unparseable args: ['--artifacts-dir=/home/admin']
...


My pipeline option .

options = PipelineOptions([
  "--runner=FlinkRunner",
  "--flink_version=1.8",

"--flink_master_url=ip-172-31-12-113.ap-northeast-1.compute.internal:8081"
,
  "--environment_config=
asia.gcr.io/creationline001/beam/python3:latest",
  "--environment_type=DOCKER",
  "--experiments=beam_fn_api",
  "--artifacts-dir=/home/admin"
  ])


Tracing from the log , looks like artifacts dir respects the default
tempdir of OS.
Thus, to adjust it I will need environment variable instead. I used
'TMPDIR' in my case.

https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/job_server.py#L203
artifacts_dir = self.local_temp_dir(prefix='artifacts')

=>
https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/job_server.py#L157
return tempfile.mkdtemp(dir=self._local_temp_root, **kwargs)

=>
https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/job_server.py#L102
self._local_temp_root = None


Then it worked.

(python-bm-2150) admin@ip-172-31-9-89:~$ env | grep TMPDIR
TMPDIR=/home/admin

=>
DEBUG:root:Starting job service with ['java', '-jar',
'/home/admin/.apache_beam/cache/beam-runners-flink-1.8-job-server-2.15.0.jar',
'--flink-master-url',
'ip-172-31-12-113.ap-northeast-1.compute.internal:8081', '--artifacts-dir',
'/home/admin/artifacts18unmki3', '--job-port', 46767, '--artifact-port', 0,
'--expansion-port', 0]


I will use nfs server for now to share the artifact directory with worker
nodes.

Thanks.
Yu Watanabe

On Tue, Sep 24, 2019 at 9:50 AM Kyle Weaver  wrote:

> The relevant configuration flag for the job server is `--artifacts-dir`.
>
> @Robert Bradshaw  I added this info to the log
> message: https://github.com/apache/beam/pull/9646
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>
>
> On Mon, Sep 23, 2019 at 11:36 AM Robert Bradshaw 
> wrote:
>
>> You need to set your artifact directory to point to a
>> distributed filesystem that's also accessible to the workers (when starting
>> up the job server).
>>
>> On Mon, Sep 23, 2019 at 5:43 AM Yu Watanabe 
>> wrote:
>>
>>> Hello.
>>>
>>> I am working on flink runner (2.15.0) and would like to ask question
>>> about how to solve my error.
>>>
>>> Currently , I have a remote cluster deployed as below . (please see
>>> slide1)
>>> All master and worker nodes are installed on different server from
>>> apache beam.
>>>
>>>
>>> https://drive.google.com/file/d/1vBULp6kiEfQNGVV3Nl2mMKAZZKYsb11h/view?usp=sharing
>>>
>>> When I run beam pipeline, harness container tries to start up, however,
>>> fails immediately with below error on docker side.
>>>
>>> =
>>> Sep 23 21:04:05 ip

How to reference manifest from apache flink worker node ?

2019-09-23 Thread Yu Watanabe
Hello.

I am working on flink runner (2.15.0) and would like to ask question about
how to solve my error.

Currently , I have a remote cluster deployed as below . (please see slide1)
All master and worker nodes are installed on different server from apache
beam.

https://drive.google.com/file/d/1vBULp6kiEfQNGVV3Nl2mMKAZZKYsb11h/view?usp=sharing

When I run beam pipeline, harness container tries to start up, however,
fails immediately with below error on docker side.
=
Sep 23 21:04:05 ip-172-31-0-143 51106514ffc0[7920]: 2019/09/23 12:04:05
Initializing python harness: /opt/apache/beam/boot --id=1
--logging_endpoint=localhost:34227 --artifact_endpoint=localhost:45303
--provision_endpoint=localhost:44585 --control_endpoint=localhost:43869
Sep 23 21:04:05 ip-172-31-0-143 dockerd:
time="2019-09-23T21:04:05.380942292+09:00" level=debug msg=event
module=libcontainerd namespace=moby topic=/tasks/start
Sep 23 21:04:05 ip-172-31-0-143 51106514ffc0[7920]: 2019/09/23 12:04:05
Failed to retrieve staged files: failed to get manifest
Sep 23 21:04:05 ip-172-31-0-143 51106514ffc0[7920]: #011caused by:
Sep 23 21:04:05 ip-172-31-0-143 51106514ffc0[7920]: rpc error: code =
Unknown desc =
=

At the same time, task manager logs below error.
=
2019-09-23 21:04:05,525 INFO
 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
 - GetManifest for
/tmp/artifactsfkyik3us/job_de145881-9ea7-4e44-8e6d-31a6ea298010/MANIFEST
2019-09-23 21:04:05,526 INFO
 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
 - Loading manifest for retrieval token
/tmp/artifactsfkyik3us/job_de145881-9ea7-4e44-8e6d-31a6ea298010/MANIFEST
2019-09-23 21:04:05,531 INFO
 
org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
 - GetManifest for
/tmp/artifactsfkyik3us/job_de145881-9ea7-4e44-8e6d-31a6ea298010/MANIFEST
failed
java.util.concurrent.ExecutionException: java.io.FileNotFoundException:
/tmp/artifactsfkyik3us/job_de145881-9ea7-4e44-8e6d-31a6ea298010/MANIFEST
(No such file or directory)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:531)
...
=

I see this artifact directory on the server where beam pipeline is executed
but not on worker node.
=
# Beam server
(python-bm-2150) admin@ip-172-31-9-89:~$ sudo ls -ld /tmp/artifactsfkyik3us
drwx-- 3 admin admin 4096 Sep 23 12:03 /tmp/artifactsfkyik3us

# Flink worker node
[ec2-user@ip-172-31-0-143 flink]$ sudo ls -ld /tmp/artifactsfkyik3us
ls: cannot access /tmp/artifactsfkyik3us: No such file or directory
 
=

>From the error, it seems that container is not starting up correctly due to
manifest file is missing.
What would be a good approach to reference artifact directory from worker
node?
I appreciate if I could get some advice .

Best Regards,
Yu Watanabe

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>


Re: Submitting job to AWS EMR fails with error=2, No such file or directory

2019-09-20 Thread Yu Watanabe
Thank you for the reply.

> Why not manually docker pull the image (remember to adjust the container
location to omit to registry) locally first?

  Actually, when using docker, I have already pulled the image from remote
repository (GCR).
  Is there a way to make Flink Runner to not call "docker pull" if the
container is already pulled ?

Thanks,
Yu Watanabe

On Fri, Sep 20, 2019 at 7:48 PM Benjamin Tan 
wrote:

> Seems like some file is missing. Why not manually docker pull the image
> (remember to adjust the container location to omit to registry) locally
> first? At least you can eliminate another source of trouble.
>
> Also, depending on your pipeline, if you are running distributed, you must
> make sure your files are accessible. For example, local paths won’t work at
> all.
>
> So maybe you can force a single worker first too.
>
> Sent from my iPhone
>
> On 20 Sep 2019, at 5:11 PM, Yu Watanabe  wrote:
>
> Ankur
>
> Thank you for the advice.
> You're right. Looking at the task manager's log,  looks like first "docker
> pull" fails from yarn user and then couple of errors comes after.
> As a result, "docker run" seems to fail.
> I have been working on whole week and still not manage through from  yarn
> session to get authenticated against Google Container Registry...
>
>
> ==
> 2019-09-19 06:47:38,196 INFO  org.apache.flink.runtime.taskmanager.Task
>   - MapPartition (MapPartition at [3]{Create,
> ParDo(EsOutputFn)}) (1/1) (d2f0d79e4614c3b0cb5a8cbd38de37da) switched from
> DEPLOYING to RUNNING.
> 2019-09-19 06:47:41,181 WARN
>  org.apache.beam.runners.fnexecution.environment.DockerCommand  - Unable to
> pull docker image asia.gcr.io/PROJECTNAME/beam/python3:latest, cause:
> Received exit code 1 for command 'docker pull
> asia.gcr.io/creationline001/beam/python3:latest'. stderr: Error response
> from daemon: unauthorized: You don't have the needed permissions to perform
> this operation, and you may have invalid credentials. To authenticate your
> request, follow the steps in:
> https://cloud.google.com/container-registry/docs/advanced-authentication
> 2019-09-19 06:47:44,035 INFO
>  
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
>  - GetManifest for
> /tmp/artifactsknnvmjj8/job_fc5fff58-4408-4e0d-833b-675215218234/MANIFEST
> 2019-09-19 06:47:44,037 INFO
>  
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
>  - Loading manifest for retrieval token
> /tmp/artifactsknnvmjj8/job_fc5fff58-4408-4e0d-833b-675215218234/MANIFEST
> 2019-09-19 06:47:44,046 INFO
>  
> org.apache.beam.runners.fnexecution.artifact.BeamFileSystemArtifactRetrievalService
>  - GetManifest for
> /tmp/artifactsknnvmjj8/job_fc5fff58-4408-4e0d-833b-675215218234/MANIFEST
> failed
> java.util.concurrent.ExecutionException: java.io.FileNotFoundException:
> /tmp/artifactsknnvmjj8/job_fc5fff58-4408-4e0d-833b-675215218234/MANIFEST
> (No such file or directory)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.getDoneValue(AbstractFuture.java:531)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.AbstractFuture.get(AbstractFuture.java:492)
> ...
> Caused by: java.io.FileNotFoundException:
> /tmp/artifactsknnvmjj8/job_fc5fff58-4408-4e0d-833b-675215218234/MANIFEST
> (No such file or directory)
> at java.io.FileInputStream.open0(Native Method)
> at java.io.FileInputStream.open(FileInputStream.java:195)
> at java.io.FileInputStream.(FileInputStream.java:138)
> at
> org.apache.beam.sdk.io.LocalFileSystem.open(LocalFileSystem.java:114)
> ...
> 2019-09-19 06:48:43,952 ERROR org.apache.flink.runtime.operators.BatchTask
>  - Error in task code:  MapPartition (MapPartition at
> [3]{Create, ParDo(EsOutputFn)}) (1/1)
> java.lang.Exception: The user defined 'open()' method caused an exception:
> java.io.IOException: Received exit code 1 for command 'docker inspect -f
> {{.State.Running}}
> 7afbdcfd241629d24872ba1c74ef10f3d07c854c9cc675a65d4d16b9fdbde752'. stderr:
> Error: No such object:
> 7afbdcfd241629d24872ba1c74ef10f3d07c854c9cc675a65d4d16b9fdbde752
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> at java.lang.Thread.run(Thread.java:748)
> ...
>
> 

Re: Submitting job to AWS EMR fails with error=2, No such file or directory

2019-09-20 Thread Yu Watanabe
=--

-rwxrwxrwx 1 yarn ec2-user 16543786 Sep 19 04:39 boot
 
===--

Looks like the PROCESS command was blocked because of this error.
 
===--

  java.util.concurrent.ExecutionException: java.io.FileNotFoundException:
/tmp/artifactskjsmgv8p/job_ea4673de-71d0-4dd9-b683-fe8c52464666/MANIFEST
(No such file or directory)
 
=======--


Thanks,
Yu Watanabe

On Wed, Sep 18, 2019 at 11:37 PM Benjamin Tan 
wrote:

> Try this as part of PipelineOptions:
>
> --environment_config={\"command\":\"/opt/apache/beam/boot\"}
>
> On 2019/09/18 10:40:42, Yu Watanabe  wrote:
> > Hello.
> >
> > I am trying to run FlinkRunner (2.15.0) on AWS EC2 instance and submit
> job
> > to AWS EMR (5.26.0).
> >
> > However, I get below error when I run the pipeline and fail.
> >
> > -
> > Caused by: java.lang.Exception: The user defined 'open()' method caused
> an
> > exception: java.io.IOException: Cannot run program "docker": error=2, No
> > such file or directory
> > at
> > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> > at
> > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > ... 1 more
> > Caused by:
> >
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> > java.io.IOException: Cannot run program "docker": error=2, No such file
> or
> > directory
> > at
> >
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
> > at
> >
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:211)
> > at
> >
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:202)
> > at
> >
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:185)
> > at
> >
> org.apache.beam.runners.flink.translation.functions.FlinkDefaultExecutableStageContext.getStageBundleFactory(FlinkDefaultExecutableStageContext.java:49)
> > at
> >
> org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingFlinkExecutableStageContextFactory.java:203)
> > at
> >
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:129)
> > at
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> > at
> > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:494)
> > ... 3 more
> > Caused by: java.io.IOException: Cannot run program "docker": error=2, No
> > such file or directory
> > at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
> > at
> >
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:141)
> > at
> >
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:92)
> > at
> >
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:152)
> > at
> >
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:178)
> > at
> >
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:162)
> > at
> >
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
> > at
> >
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
> > at
> >
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
> > at
> >
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.get(LocalCache.java:2044)
> > at
> >
> org.apach

apache beam 2.16.0 ?

2019-09-18 Thread Yu Watanabe
Hello.

I would like to use 2.16.0 to diagnose container problem, however, looks
like the job-server is not available on maven yet.

RuntimeError: Unable to fetch remote job server jar at
https://repo.maven.apache.org/maven2/org/apache/beam/beam-runners-flink-1.8-job-server/2.16.0/beam-runners-flink-1.8-job-server-2.16.0.jar:
HTTP Error 404: Not Found

Checked maven repo and indeed there is no job-server 2.16.0  yet.

https://mvnrepository.com/artifact/org.apache.beam/beam-runners-flink


Will 2.16.0 released soon ?

Thanks,
Yu Watanabe

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>


Re: Submitting job to AWS EMR fails with error=2, No such file or directory

2019-09-18 Thread Yu Watanabe
Thank you for the reply.

I see files "boot" under below directories.
But these seems to be used for containers.

  (python) admin@ip-172-31-9-89:~/beam-release-2.15.0$ find ./ -name "boot"
-exec ls -l {} \;
lrwxrwxrwx 1 admin admin 23 Sep 16 23:43
./sdks/python/container/.gogradle/project_gopath/src/
github.com/apache/beam/sdks/python/boot -> ../../../../../../../..
-rwxr-xr-x 1 admin admin 16543786 Sep 16 23:48
./sdks/python/container/build/target/launcher/linux_amd64/boot
-rwxr-xr-x 1 admin admin 16358928 Sep 16 23:48
./sdks/python/container/build/target/launcher/darwin_amd64/boot
-rwxr-xr-x 1 admin admin 16543786 Sep 16 23:48
./sdks/python/container/py3/build/docker/target/linux_amd64/boot
-rwxr-xr-x 1 admin admin 16358928 Sep 16 23:48
./sdks/python/container/py3/build/docker/target/darwin_amd64/boot
-rwxr-xr-x 1 admin admin 16543786 Sep 16 23:48
./sdks/python/container/py3/build/target/linux_amd64/boot
-rwxr-xr-x 1 admin admin 16358928 Sep 16 23:48
./sdks/python/container/py3/build/target/darwin_amd64/boot

On Wed, Sep 18, 2019 at 11:37 PM Benjamin Tan 
wrote:

> Try this as part of PipelineOptions:
>
> --environment_config={\"command\":\"/opt/apache/beam/boot\"}
>
> On 2019/09/18 10:40:42, Yu Watanabe  wrote:
> > Hello.
> >
> > I am trying to run FlinkRunner (2.15.0) on AWS EC2 instance and submit
> job
> > to AWS EMR (5.26.0).
> >
> > However, I get below error when I run the pipeline and fail.
> >
> > -
> > Caused by: java.lang.Exception: The user defined 'open()' method caused
> an
> > exception: java.io.IOException: Cannot run program "docker": error=2, No
> > such file or directory
> > at
> > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> > at
> > org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> > at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> > ... 1 more
> > Caused by:
> >
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> > java.io.IOException: Cannot run program "docker": error=2, No such file
> or
> > directory
> > at
> >
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
> > at
> >
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:211)
> > at
> >
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:202)
> > at
> >
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:185)
> > at
> >
> org.apache.beam.runners.flink.translation.functions.FlinkDefaultExecutableStageContext.getStageBundleFactory(FlinkDefaultExecutableStageContext.java:49)
> > at
> >
> org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingFlinkExecutableStageContextFactory.java:203)
> > at
> >
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:129)
> > at
> >
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> > at
> > org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:494)
> > ... 3 more
> > Caused by: java.io.IOException: Cannot run program "docker": error=2, No
> > such file or directory
> > at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
> > at
> >
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:141)
> > at
> >
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:92)
> > at
> >
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:152)
> > at
> >
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:178)
> > at
> >
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:162)
> > at
> >
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
> > at
> >

Re: Submitting job to AWS EMR fails with error=2, No such file or directory

2019-09-18 Thread Yu Watanabe
t kill container:
3e7e0d2d9d8362995d4b566ce1611834d56c5ca550ae89ae698a279271e4c33b: No such
container: 3e7e0d2d9d8362995d4b566ce1611834d56c5ca550ae89ae698a279271e4c33b
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:179)
at
org.apache.beam.runners.fnexecution.environment.DockerCommand.killContainer(DockerCommand.java:134)
at
org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:174)
... 21 more

=

Next , I'd like to try using environment type 'PROCESS' but it seems you
need external command for 'environment_config'

https://github.com/apache/beam/blob/7931ec055e2da7214c82e368ef7d7fd679faaef1/sdks/python/apache_beam/runners/portability/portable_runner.py#L138

  config = json.loads(portable_options.environment_config)

May I ask what command I need to set as shell script for each task managers
?

Best Regards,
Yu Watanabe

On Wed, Sep 18, 2019 at 9:39 PM Benjamin Tan 
wrote:

> Seems like docker is not installed. Maybe run with PROCESS as the
> environment type ? Or install docker.
>
> Sent from my iPhone
>
> On 18 Sep 2019, at 6:40 PM, Yu Watanabe  wrote:
>
> Hello.
>
> I am trying to run FlinkRunner (2.15.0) on AWS EC2 instance and submit job
> to AWS EMR (5.26.0).
>
> However, I get below error when I run the pipeline and fail.
>
> -
> Caused by: java.lang.Exception: The user defined 'open()' method caused an
> exception: java.io.IOException: Cannot run program "docker": error=2, No
> such file or directory
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:498)
> at
> org.apache.flink.runtime.operators.BatchTask.invoke(BatchTask.java:368)
> at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
> ... 1 more
> Caused by:
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.UncheckedExecutionException:
> java.io.IOException: Cannot run program "docker": error=2, No such file or
> directory
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LocalLoadingCache.getUnchecked(LocalCache.java:4966)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:211)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$SimpleStageBundleFactory.(DefaultJobBundleFactory.java:202)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory.forStage(DefaultJobBundleFactory.java:185)
> at
> org.apache.beam.runners.flink.translation.functions.FlinkDefaultExecutableStageContext.getStageBundleFactory(FlinkDefaultExecutableStageContext.java:49)
> at
> org.apache.beam.runners.flink.translation.functions.ReferenceCountingFlinkExecutableStageContextFactory$WrappedContext.getStageBundleFactory(ReferenceCountingFlinkExecutableStageContextFactory.java:203)
> at
> org.apache.beam.runners.flink.translation.functions.FlinkExecutableStageFunction.open(FlinkExecutableStageFunction.java:129)
> at
> org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
> at
> org.apache.flink.runtime.operators.BatchTask.run(BatchTask.java:494)
> ... 3 more
> Caused by: java.io.IOException: Cannot run program "docker": error=2, No
> such file or directory
> at java.lang.ProcessBuilder.start(ProcessBuilder.java:1048)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runShortCommand(DockerCommand.java:141)
> at
> org.apache.beam.runners.fnexecution.environment.DockerCommand.runImage(DockerCommand.java:92)
> at
> org.apache.beam.runners.fnexecution.environment.DockerEnvironmentFactory.createEnvironment(DockerEnvironmentFactory.java:152)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:178)
> at
> org.apache.beam.runners.fnexecution.control.DefaultJobBundleFactory$1.load(DefaultJobBundleFactory.java:162)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3528)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.loadSync(LocalCache.java:2277)
> at
> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2154)
&g

Submitting job to AWS EMR fails with error=2, No such file or directory

2019-09-18 Thread Yu Watanabe
=-
(python) [ec2-user@ip-172-31-2-121 ~]$ docker info
Containers: 0
 Running: 0
 Paused: 0
 Stopped: 0
...
 -

I used  "debian-stretch" .

-
debian-stretch-hvm-x86_64-gp2-2019-09-08-17994-572488bb-fc09-4638-8628-e1e1d26436f4-ami-0ed2d2283aa1466df.4
(ami-06f16171199d98c63)
-

This seems to not happen when flink runs locally.

-
admin@ip-172-31-9-89:/opt/flink$ sudo ss -atunp | grep 8081
tcpLISTEN 0  128  :::8081 :::*
  users:(("java",pid=18420,fd=82))
admin@ip-172-31-9-89:/opt/flink$ sudo ps -ef | grep java | head -1
admin17698 1  0 08:59 ?00:00:12 java -jar
/home/admin/.apache_beam/cache/beam-runners-flink-1.8-job-server-2.15.0.jar
--flink-master-url ip-172-31-1-84.ap-northeast-1.compute.internal:43581
--artifacts-dir /tmp/artifactskj47j8yn --job-port 48205 --artifact-port 0
--expansion-port 0
admin@ip-172-31-9-89:/opt/flink$
========-

Would there be any other setting I need to look for when running on EC2
instance ?

Thanks,
Yu Watanabe

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>


Re: How to use google container registry for FlinkRunner ?

2019-09-17 Thread Yu Watanabe
Kyle, Benjamin

Thank you for the quick reply.
And yes, adding "environment_config"  to Pipeline options did the trick.

===
options = PipelineOptions([
  "--runner=FlinkRunner",
  "--flink_version=1.8",
  "--flink_master_url=localhost:8081",
  "--environment_config=
asia.gcr.io/PROJECTNAME/beam/python3",
  "--experiments=beam_fn_api"
  ])
p = beam.Pipeline(options=options)
=======

Thanks,
Yu Watanabe

On Tue, Sep 17, 2019 at 11:21 AM Benjamin Tan 
wrote:

> Something like this:
>
> options = PipelineOptions(["--runner=PortableRunner",
>"--environment_config=apachebeam/python3.7_sdk
> ", # <---
>"--job_endpoint=dnnserver2:8099"])
>
> On 2019/09/17 02:14:00, Kyle Weaver  wrote:
> > I think environment_config is the option you are looking for.
> >
> > On Mon, Sep 16, 2019 at 7:06 PM Yu Watanabe 
> wrote:
> >
> > > Hello.
> > >
> > > For testing I would like to use image uploaded to google container
> > > registry.
> > > How can I use images pulled from other than bintray.io ?
> > >
> > >
> > >
> 
> > > admin@ip-172-31-9-89:~$ gcloud container images list --repository
> > > asia.gcr.io/[PROJECTNAME] <http://asia.gcr.io/%5BPROJECTNAME%5D> <
> http://asia.gcr.io/%5BPROJECTNAME%5D>
> > > NAME
> > > asia.gcr.io/[PROJECTNAME]/beam
> <http://asia.gcr.io/%5BPROJECTNAME%5D/beam> <
> http://asia.gcr.io/%5BPROJECTNAME%5D/beam>
> > >
> > >
> 
> > >
> > > Looks like FlinkRunner (2.15.0)  uses bintray repository as a default
> > > behavior. As a result I am not able to use
> > >
> > > Caused by: java.lang.Exception: The user defined 'open()' method
> caused an
> > > exception: java.io.IOException: Received exit code 125 for command
> 'docker
> > > run -d --network=host --env=DOCKER_MAC_CONTAINER=null --rm
> > > ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=1
> > > --logging_endpoint=localhost:33787 --artifact_endpoint=localhost:41681
> > > --provision_endpoint=localhost:45089
> --control_endpoint=localhost:37045'.
> > > stderr: Unable to find image '
> > > ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
> > > Error response from daemon: manifest for
> > > ywatanabe-docker-apache.bintray.io/beam/python3:latest not found:
> > > manifest unknown: The named manifest is not known to the registry.See
> > > 'docker run --
> > >
> > > According to online doc, there is not parameter which controls which
> image
> > > to use .
> > >
> > > https://beam.apache.org/documentation/runners/flink/
> > >
> > > Pipeline options I am using is below.
> > >
> > >
> > >
> 
> > > options = PipelineOptions([
> > >   "--runner=FlinkRunner",
> > >   "--flink_version=1.8",
> > >   "--flink_master_url=localhost:8081",
> > >   "--worker_harness_container_image=
> > > asia.gcr.io/PROJECTNAME/beam/python3",
> > >   "--experiments=beam_fn_api"
> > >   ])
> > >
> > >
> 
> > >
> > > Perhaps is there any  environment variable to specify which image to
> use ?
> > >
> > > Best Regards,
> > > Yu Watanabe
> > >
> > > --
> > > Yu Watanabe
> > > Weekend Freelancer who loves to challenge building data platform
> > > yu.w.ten...@gmail.com
> > > [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>
> [image:
> > > Twitter icon] <https://twitter.com/yuwtennis>
> > >
> > --
> > Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
> >
>


-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>


How to use google container registry for FlinkRunner ?

2019-09-16 Thread Yu Watanabe
Hello.

For testing I would like to use image uploaded to google container registry.
How can I use images pulled from other than bintray.io ?


admin@ip-172-31-9-89:~$ gcloud container images list --repository
asia.gcr.io/[PROJECTNAME] <http://asia.gcr.io/%5BPROJECTNAME%5D>
NAME
asia.gcr.io/[PROJECTNAME]/beam <http://asia.gcr.io/%5BPROJECTNAME%5D/beam>


Looks like FlinkRunner (2.15.0)  uses bintray repository as a default
behavior. As a result I am not able to use

Caused by: java.lang.Exception: The user defined 'open()' method caused an
exception: java.io.IOException: Received exit code 125 for command 'docker
run -d --network=host --env=DOCKER_MAC_CONTAINER=null --rm
ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=1
--logging_endpoint=localhost:33787 --artifact_endpoint=localhost:41681
--provision_endpoint=localhost:45089 --control_endpoint=localhost:37045'.
stderr: Unable to find image '
ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
Error response from daemon: manifest for
ywatanabe-docker-apache.bintray.io/beam/python3:latest not found: manifest
unknown: The named manifest is not known to the registry.See 'docker run --

According to online doc, there is not parameter which controls which image
to use .

https://beam.apache.org/documentation/runners/flink/

Pipeline options I am using is below.



options = PipelineOptions([
  "--runner=FlinkRunner",
  "--flink_version=1.8",
  "--flink_master_url=localhost:8081",
  "--worker_harness_container_image=
asia.gcr.io/PROJECTNAME/beam/python3",
  "--experiments=beam_fn_api"
  ])



Perhaps is there any  environment variable to specify which image to use ?

Best Regards,
Yu Watanabe

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>


Re: Flink Runner logging FAILED_TO_UNCOMPRESS

2019-09-14 Thread Yu Watanabe
Kyle

Thank you for the assistance.

By specifying "experiments" in PipelineOptions ,
==
options = PipelineOptions([
  "--runner=FlinkRunner",
  "--flink_version=1.8",
  "--flink_master_url=localhost:8081",
  "--experiments=beam_fn_api"
  ])
==

I was able to submit the job successfully.

[grpc-default-executor-0] INFO
org.apache.beam.runners.flink.FlinkJobInvoker - Invoking job
BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
[grpc-default-executor-0] INFO
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Starting
job invocation
BeamApp-ywatanabe-0915024400-8e0dc08_bc24de73-c729-41ad-ae27-35281b45feb9
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkPipelineRunner - Translating pipeline to
Flink program.
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Creating a Batch
Execution Environment.
[flink-runner-job-invoker] INFO
org.apache.beam.runners.flink.FlinkExecutionEnvironments - Using Flink
Master URL localhost:8081.
[flink-runner-job-invoker] WARN
org.apache.beam.runners.flink.FlinkExecutionEnvironments - No default
parallelism could be found. Defaulting to parallelism 1. Please set an
explicit parallelism with --parallelism
[flink-runner-job-invoker] INFO
org.apache.flink.api.java.ExecutionEnvironment - The job has 0 registered
types and 0 default Kryo serializers
[flink-runner-job-invoker] INFO
org.apache.flink.configuration.Configuration - Config uses fallback
configuration key 'jobmanager.rpc.address' instead of key 'rest.address'
[flink-runner-job-invoker] INFO org.apache.flink.runtime.rest.RestClient -
Rest client endpoint started.
[flink-runner-job-invoker] INFO
org.apache.flink.client.program.rest.RestClusterClient - Submitting job
4e055a8878dda3f564a7b7c84d48510d (detached: false).

Thanks,
Yu Watanabe

On Sun, Sep 15, 2019 at 3:01 AM Kyle Weaver  wrote:

> Try adding "--experiments=beam_fn_api" to your pipeline options. (This is
> a known issue with Beam 2.15 that will be fixed in 2.16.)
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>
>
> On Sat, Sep 14, 2019 at 12:52 AM Yu Watanabe 
> wrote:
>
>> Hello.
>>
>> I am trying to spin up the flink runner but looks like data serialization
>> is failing.
>> I would like to ask for help to get over with this error.
>>
>> 
>> [flink-runner-job-invoker] ERROR
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error
>> during job invocation
>> BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
>> java.lang.IllegalArgumentException: unable to deserialize BoundedSource
>> at
>> org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
>> at
>> org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
>> at
>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573)
>> at
>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278)
>> at
>> org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120)
>> at
>> org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84)
>> at
>> org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63)
>> at
>> org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
>> at
>> org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
>> at
>> java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python)
>> ywatanabe@debian-09-00:~$
>> at
>> java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
>>  

Flink Runner logging FAILED_TO_UNCOMPRESS

2019-09-14 Thread Yu Watanabe
Hello.

I am trying to spin up the flink runner but looks like data serialization
is failing.
I would like to ask for help to get over with this error.


[flink-runner-job-invoker] ERROR
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation - Error
during job invocation
BeamApp-ywatanabe-0914074210-2fcf987a_3dc1d4dc-4754-470a-9a23-eb8a68903016.
java.lang.IllegalArgumentException: unable to deserialize BoundedSource
at
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:74)
at
org.apache.beam.runners.core.construction.ReadTranslation.boundedSourceFromProto(ReadTranslation.java:94)
at
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translateRead(FlinkBatchPortablePipelineTranslator.java:573)
at
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:278)
at
org.apache.beam.runners.flink.FlinkBatchPortablePipelineTranslator.translate(FlinkBatchPortablePipelineTranslator.java:120)
at
org.apache.beam.runners.flink.FlinkPipelineRunner.runPipelineWithTranslator(FlinkPipelineRunner.java:84)
at
org.apache.beam.runners.flink.FlinkPipelineRunner.run(FlinkPipelineRunner.java:63)
at
org.apache.beam.runners.fnexecution.jobsubmission.JobInvocation.runPipeline(JobInvocation.java:74)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask$TrustedFutureInterruptibleTask.runInterruptibly(TrustedListenableFutureTask.java:125)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.InterruptibleTask.run(InterruptibleTask.java:57)
at
org.apache.beam.vendor.guava.v26_0_jre.com.google.common.util.concurrent.TrustedListenableFutureTask.run(TrustedListenableFutureTask.java:78)
at
java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)(python)
ywatanabe@debian-09-00:~$
at
java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Caused by: java.io.IOException: FAILED_TO_UNCOMPRESS(5)
at org.xerial.snappy.SnappyNative.throw_error(SnappyNative.java:98)
at org.xerial.snappy.SnappyNative.rawUncompress(Native Method)
at org.xerial.snappy.Snappy.rawUncompress(Snappy.java:474)
at org.xerial.snappy.Snappy.uncompress(Snappy.java:513)
at
org.xerial.snappy.SnappyInputStream.readFully(SnappyInputStream.java:147)
at
org.xerial.snappy.SnappyInputStream.readHeader(SnappyInputStream.java:99)
at
org.xerial.snappy.SnappyInputStream.(SnappyInputStream.java:59)
at
org.apache.beam.sdk.util.SerializableUtils.deserializeFromByteArray(SerializableUtils.java:68)
... 13 more


My beam version is below.

===
(python) ywatanabe@debian-09-00:~$ pip3 freeze | grep apache-beam
apache-beam==2.15.0
===

I have my harness container ready on  the registry.

===
ywatanabe@debian-09-00:~$ docker search
ywatanabe-docker-apache.bintray.io/python3
NAMEDESCRIPTION STARS   OFFICIAL
 AUTOMATED
beam/python30
===

Flink is ready on separate cluster.

===
(python) ywatanabe@debian-09-00:~$ ss -atunp | grep 8081
tcpLISTEN 0  128  :::8081 :::*
===

My debian version.

===
(python) ywatanabe@debian-09-00:~$ cat /etc/debian_version
9.11
===

My code snippet is below.

===
options = PipelineOptions([
  "--runner=FlinkRunner",
  "--flink_version=1.8",
  "--flink_master_url=localhost:8081"
  ])

with beam.Pipeline(options=options) as p:

(p | beam.Create(["Hello World"]))
===

Would there be any other settings should I look for ?

Thanks,
Yu Watanabe

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>


Re: How do you write portable runner pipeline on separate python code ?

2019-09-14 Thread Yu Watanabe
Lukasz

Thank you for the reply.

> * Using a "remote" filesystem such as HDFS/S3/GCS/...
> * Mounting an external directory into the container so that any "local"
writes appear outside the container
> * Using a non-docker environment such as external or process.

  Understood.

Thanks,
Yu Watanabe

On Fri, Sep 13, 2019 at 2:34 AM Lukasz Cwik  wrote:

> When you use a local filesystem path and a docker environment, "/tmp" is
> written inside the container. You can solve this issue by:
> * Using a "remote" filesystem such as HDFS/S3/GCS/...
> * Mounting an external directory into the container so that any "local"
> writes appear outside the container
> * Using a non-docker environment such as external or process.
>
> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe  wrote:
>
>> Hello.
>>
>> I would like to ask for help with my sample code using portable runner
>> using apache flink.
>> I was able to work out the wordcount.py using this page.
>>
>> https://beam.apache.org/roadmap/portability/
>>
>> I got below two files under /tmp.
>>
>> -rw-r--r-- 1 ywatanabe ywatanabe185 Sep 12 19:56
>> py-wordcount-direct-1-of-2
>> -rw-r--r-- 1 ywatanabe ywatanabe190 Sep 12 19:56
>> py-wordcount-direct-0-of-2
>>
>> Then I wrote sample code with below steps.
>>
>> 1.Install apache_beam using pip3 separate from source code directory.
>> 2. Wrote sample code as below and named it "test-protable-runner.py".
>> Placed it separate directory from source code.
>>
>> ---
>> (python) ywatanabe@debian-09-00:~$ ls -ltr
>> total 16
>> drwxr-xr-x 18 ywatanabe ywatanabe 4096 Sep 12 19:06 beam (<- source code
>> directory)
>> -rw-r--r--  1 ywatanabe ywatanabe  634 Sep 12 20:25
>> test-portable-runner.py
>>
>> ---
>> 3. Executed the code with "python3 test-protable-ruuner.py"
>>
>>
>> ==
>> #!/usr/bin/env
>>
>> import apache_beam as beam
>> from apache_beam.options.pipeline_options import PipelineOptions
>> from apache_beam.io import WriteToText
>>
>>
>> def printMsg(line):
>>
>> print("OUTPUT: {0}".format(line))
>>
>> return line
>>
>> options = PipelineOptions(["--runner=PortableRunner",
>> "--job_endpoint=localhost:8099", "--shutdown_sources_on_final_watermark"])
>>
>> p = beam.Pipeline(options=options)
>>
>> output = ( p | 'create' >> beam.Create(["a", "b", "c"])
>>  | beam.Map(printMsg)
>>  )
>>
>> output | 'write' >> WriteToText('/tmp/sample.txt')
>>
>> ===
>>
>> Job seemed to went all the way to "FINISHED" state.
>>
>> ---
>> [DataSource (Impulse) (1/1)] INFO
>> org.apache.flink.runtime.taskmanager.Task - Registering task at network:
>> DataSource (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) [DEPLOYING].
>> [DataSource (Impulse) (1/1)] INFO
>> org.apache.flink.runtime.taskmanager.Task - DataSource (Impulse) (1/1)
>> (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING to RUNNING.
>> [flink-akka.actor.default-dispatcher-3] INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - DataSource
>> (Impulse) (1/1) (9df528f4d493d8c3b62c8be23dc889a8) switched from DEPLOYING
>> to RUNNING.
>> [flink-akka.actor.default-dispatcher-3] INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from CREATED to SCHEDULED.
>> [flink-akka.actor.default-dispatcher-3] INFO
>> org.apache.flink.runtime.executiongraph.ExecutionGraph - CHAIN MapPartition
>> (MapPartition at [2]write/Write/WriteImpl/DoOnce/{FlatMap(> core.py:2415>), Map(decode)}) -> FlatMap (FlatMap at ExtractOutput[0])
>> (1/1) (0b60f1a9e620b061f85e97998aa4b181) switched from SCHEDULED to
>> DEPLOYING.
>> [flink-akka.actor.default-dispat

Re: How do you write portable runner pipeline on separate python code ?

2019-09-14 Thread Yu Watanabe
Kyle

Thank you for the advice.

> For example, Yu's pipeline errored here because the expected Docker
container wasn't built before running.

I was able to spin up the harness container  and submit job to the job
service by preparing the container properly.
I needed to do extra steps  in the online instruction..
What I have done is things you should already know I guess.

Below (*) is what I have done.


https://beam.apache.org/documentation/runners/flink/

Executing a Beam pipeline on a Flink Cluster

1. Only required once: Build the SDK harness container (optionally replace
py35 with the Python version of your choice): ./gradlew
:sdks:python:container:py35:docker
*2. Prepare bintray account (https://bintray.com/)
*3. Push the image to bintray registry using "docker push" (mentioned here
=> https://github.com/apache/beam/blob/release-2.15.0/sdks/CONTAINERS.md)
*4. Login to bintray account by "docker login"
5.. Start the JobService endpoint: ./gradlew
:runners:flink:1.5:job-server:runShadow

The JobService is the central instance where you submit your Beam pipeline
to. The JobService will create a Flink job for the pipeline and execute the
job. To execute the job on a Flink cluster, the Beam JobService needs to be
provided with the Flink JobManager address.

6. Submit the Python pipeline to the above endpoint by using the
PortableRunner and job_endpoint set to localhost:8099 (this is the default
address of the JobService). For example:
====

Thanks,
Yu Watanabe

On Fri, Sep 13, 2019 at 5:09 AM Kyle Weaver  wrote:

> +dev  I think we should probably point new users of
> the portable Flink/Spark runners to use loopback or some other non-docker
> environment, as Docker adds some operational complexity that isn't really
> needed to run a word count example. For example, Yu's pipeline errored here
> because the expected Docker container wasn't built before running.
>
> Kyle Weaver | Software Engineer | github.com/ibzib | kcwea...@google.com
>
>
> On Thu, Sep 12, 2019 at 11:27 AM Robert Bradshaw 
> wrote:
>
>> On this note, making local files easy to read is something we'd
>> definitely like to improve, as the current behavior is quite surprising.
>> This could be useful not just for running with docker and the portable
>> runner locally, but more generally when running on a distributed system
>> (e.g. a Flink/Spark cluster or Dataflow). It would be very convenient if we
>> could automatically stage local files to be read as artifacts that could be
>> consumed by any worker (possibly via external directory mounting in the
>> local docker case rather than an actual copy), and conversely copy small
>> outputs back to the local machine (with the similar optimization for local
>> docker).
>>
>> At the very least, however, obvious messaging when the local filesystem
>> is used from within docker, which is often a (non-obvious and hard to
>> debug) mistake should be added.
>>
>>
>> On Thu, Sep 12, 2019 at 10:34 AM Lukasz Cwik  wrote:
>>
>>> When you use a local filesystem path and a docker environment, "/tmp" is
>>> written inside the container. You can solve this issue by:
>>> * Using a "remote" filesystem such as HDFS/S3/GCS/...
>>> * Mounting an external directory into the container so that any "local"
>>> writes appear outside the container
>>> * Using a non-docker environment such as external or process.
>>>
>>> On Thu, Sep 12, 2019 at 4:51 AM Yu Watanabe 
>>> wrote:
>>>
>>>> Hello.
>>>>
>>>> I would like to ask for help with my sample code using portable runner
>>>> using apache flink.
>>>> I was able to work out the wordcount.py using this page.
>>>>
>>>> https://beam.apache.org/roadmap/portability/
>>>>
>>>> I got below two files under /tmp.
>>>>
>>>> -rw-r--r-- 1 ywatanabe ywatanabe185 Sep 12 19:56
>>>> py-wordcount-direct-1-of-2
>>>> -rw-r--r-- 1 ywatanabe ywatanabe190 Sep 12 19:56
>>>> py-wordcount-direct-0-of-2
>>>>
>>>> Then I wrote sample code with below steps.
>>>>
>>>> 1.Install apache_beam using pip3 separate from source code directory.
>>>> 2. Wrote sample code as below and named it "test-protable-runner.py".
>>>> Placed it separate directory from source code.
>>>>
>>>> ---
>>&

Re: How to buffer events using spark portable runner ?

2019-09-12 Thread Yu Watanabe
Lukasaz

Thank you for the reply.

I will try apache flink.

Thanks,
Yu

On Sun, Sep 8, 2019 at 11:59 PM Lukasz Cwik  wrote:

> Try using Apache Flink.
>
> On Sun, Sep 8, 2019 at 6:23 AM Yu Watanabe  wrote:
>
>> Hello .
>>
>> I would like to ask question related to timely processing as stated in
>> below page.
>>
>> https://beam.apache.org/blog/2017/08/28/timely-processing.html
>>
>> Python version: 3.7.4
>> apache beam version: 2.15.0
>>
>> I currently use timely processing to first buffer events and send *bulk
>> requests *to elasticsearch. The source of data is bounded source and I
>> use DirectRunner for runner.
>>
>> To have more memory resource , I am considering to move to process the
>> pipeline on apache spark using portable runner. However, according to
>> compatibility matrix,
>> *Timers *is not supported on apache spark.
>>
>>
>> https://beam.apache.org/documentation/runners/capability-matrix/#cap-summary-when
>>
>> Is there anyway in portable runner that you can do similar processing as 
>> *timely
>> processing* ?
>> This is my first time using portable runner and I appreciate if I can get
>> help with this.
>>
>> Best Regards,
>> Yu Watanabe
>>
>> --
>> Yu Watanabe
>> Weekend Freelancer who loves to challenge building data platform
>> yu.w.ten...@gmail.com
>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>> Twitter icon] <https://twitter.com/yuwtennis>
>>
>

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>


How do you write portable runner pipeline on separate python code ?

2019-09-12 Thread Yu Watanabe
ts of Python 3 are not yet fully
supported by Apache Beam.
  'Some syntactic constructs of Python 3 are not yet fully supported by '
ERROR:root:java.io.IOException: Received exit code 125 for command 'docker
run -d --network=host --env=DOCKER_MAC_CONTAINER=null
ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
--logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
--provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
stderr: Unable to find image '
ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
Error response from daemon: unknown: Subject ywatanabe was not found.See
'docker run --help'.
Traceback (most recent call last):
  File "test-portable-runner.py", line 27, in 
result.wait_until_finish()
  File
"/home/ywatanabe/python/lib/python3.5/site-packages/apache_beam/runners/portability/portable_runner.py",
line 446, in wait_until_finish
self._job_id, self._state, self._last_error_message()))
RuntimeError: Pipeline
BeamApp-ywatanabe-0912112516-22d84d63_d3b5e778-d771-4118-9ba7-c9dde85938e9
failed in state FAILED: java.io.IOException: Received exit code 125 for
command 'docker run -d --network=host --env=DOCKER_MAC_CONTAINER=null
ywatanabe-docker-apache.bintray.io/beam/python3:latest --id=13-1
--logging_endpoint=localhost:39049 --artifact_endpoint=localhost:33779
--provision_endpoint=localhost:34827 --control_endpoint=localhost:36079'.
stderr: Unable to find image '
ywatanabe-docker-apache.bintray.io/beam/python3:latest' locallydocker:
Error response from daemon: unknown: Subject ywatanabe was not found.See
'docker run --help'.

---

As a result , I got nothing under /tmp . Code works when using DirectRunner.
May I ask , where should I look for in order to get the pipeline to write
results to text files under /tmp ?

Best Regards,
Yu Watanabe

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>


How to buffer events using spark portable runner ?

2019-09-08 Thread Yu Watanabe
Hello .

I would like to ask question related to timely processing as stated in
below page.

https://beam.apache.org/blog/2017/08/28/timely-processing.html

Python version: 3.7.4
apache beam version: 2.15.0

I currently use timely processing to first buffer events and send *bulk
requests *to elasticsearch. The source of data is bounded source and I use
DirectRunner for runner.

To have more memory resource , I am considering to move to process the
pipeline on apache spark using portable runner. However, according to
compatibility matrix,
*Timers *is not supported on apache spark.

https://beam.apache.org/documentation/runners/capability-matrix/#cap-summary-when

Is there anyway in portable runner that you can do similar processing
as *timely
processing* ?
This is my first time using portable runner and I appreciate if I can get
help with this.

Best Regards,
Yu Watanabe

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>


Which memory profiler is used for python 3.7.3 ?

2019-07-24 Thread Yu Watanabe
Hello.

The built in memory profiler used guppy for the profiler but  unfortunately
guppy is not supported on python3.

https://beam.apache.org/releases/pydoc/2.13.0/_modules/apache_beam/utils/profiler.html


try:
  from guppy import hpy  # pylint: disable=import-error
  self._hpy = hpy
  self._interval_second = interval_second
  self._timer = None
except ImportError:
  warnings.warn('guppy is not installed; MemoryReporter not available.')
  self._hpy = None
self._enabled = False

I am thinking of using memory profiler  but  I was wondering which memory
profiler do users of apache beam use for python 3.7.3 ?

https://pypi.org/project/memory-profiler/

Thanks,
Yu Watanabe

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>


Re: Any way to profile speed for each transforms ?

2019-07-24 Thread Yu Watanabe
Robert.

Thank you for the information.
I downloaded apache beam using pip3 so beam environment is not a problem in
my case.

The solution looks fantastic | I love it.

Below is output from my sample code.

2019-07-25 10:33:18,722 INFO Test started at 2019-07-25 10:33:18
2019-07-25 10:33:19,780 INFO Filesystem() done. Elapsed time 1.058 ms.
2019-07-25 10:33:31,479 INFO   
2019-07-25 10:33:31,479 INFO   
2019-07-25 10:33:31,480 INFO   
2019-07-25 10:33:31,480 INFO   
2019-07-25 10:33:31,481 INFO   
2019-07-25 10:33:31,481 INFO   
2019-07-25 10:33:31,482 INFO   
2019-07-25 10:33:31,482 INFO   
2019-07-25 10:33:31,483 INFO   
2019-07-25 10:33:31,484 INFO   
2019-07-25 10:33:31,484 INFO   
2019-07-25 10:33:31,484 INFO   
2019-07-25 10:33:31,485 INFO Running
((ref_AppliedPTransform_Create/Read_3)+(ref_AppliedPTransform_FlatMap()_4))+(ref_AppliedPTransform_ParDo(OpenFn)_5)
2019-07-25 10:33:31,485 INFO Running
((ref_AppliedPTransform_Create/Read_3)+(ref_AppliedPTransform_FlatMap()_4))+(ref_AppliedPTransform_ParDo(OpenFn)_5)
{'((ref_AppliedPTransform_Create/Read_3)+(ref_AppliedPTransform_FlatMap()_4))+(ref_AppliedPTransform_ParDo(OpenFn)_5)':
ptransforms {
  key: "Create/Read"
  value {
processed_elements {
  measured {
output_element_counts {
  key: "out"
  value: 1
}
total_time_spent: 0.330032672
  }
}
  }
}


Thanks,
Yu

On Thu, Jul 25, 2019 at 12:26 AM Robert Bradshaw 
wrote:

> Also, take note that these counters will only be available if Beam has
> been compiled with Cython ( e.g. installed from a wheel). Of course if you
> care about performance you'd want that anyway.
>
> On Wed, Jul 24, 2019, 5:15 PM Robert Bradshaw  wrote:
>
>> Beam tracks the amount of time spent in each transform in profile
>> counters. There is ongoing work to expose these in a uniform way for all
>> runners (e.g. in Dataflow they're displayed on the UI), but for the direct
>> runner you can see an example at
>> https://github.com/apache/beam/blob/release-2.14.0/sdks/python/apache_beam/runners/portability/fn_api_runner_test.py#L1046
>>  .
>> For a raw dump you could do something like:
>>
>> p = beam.Pipeline(...)
>> p | beam.Read...
>> results = p.run()
>> results.wait_until_finish()
>> import pprint
>> pprint.pprint(results._metrics_by_stage)
>>
>>
>>
>>
>> On Wed, Jul 24, 2019 at 4:07 PM Yu Watanabe 
>> wrote:
>>
>>> Hello .
>>>
>>> I have a pipeline built on  apache beam 2.13.0 using python 3.7.3.
>>> My pipeline lasts about 5 hours to ingest 2 sets of approximately 7
>>> Json objects using Direct Runner.
>>>
>>> I want to diagnose which transforms are taking time and  improve code
>>> for better performance. I saw below module for profiling but it seems it
>>> does not report about speed of each transform.
>>>
>>>
>>> https://beam.apache.org/releases/pydoc/2.13.0/apache_beam.utils.profiler.html
>>>
>>> Is there any module that you could use to monitor speed of each
>>> transform ? If not, I appreciate if I could get some help for how to
>>> monitor speed for each transform.
>>>
>>> Best Regards,
>>> Yu Watanabe
>>>
>>> --
>>> Yu Watanabe
>>> Weekend Freelancer who loves to challenge building data platform
>>> yu.w.ten...@gmail.com
>>> [image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
>>> Twitter icon] <https://twitter.com/yuwtennis>
>>>
>>

-- 
Yu Watanabe
Weekend Freelancer who loves to challenge building data platform
yu.w.ten...@gmail.com
[image: LinkedIn icon] <https://www.linkedin.com/in/yuwatanabe1>  [image:
Twitter icon] <https://twitter.com/yuwtennis>